This is a story of how my understanding about messaging with RabbitMQ evolved from ignorant to usable while hitting my head into the walls of the CAP theorem. I hope it will save some time for someone out there.

Of course we will have guaranteed delivery!

- So, Balint, we hear that you’re building a message bus?

- Well, yes, I had a simple use case in our system and I thought I’d use RabbitMQ

- Cool, can we use it too?

- Well, let’s see what your use case is. As for example in my use case we have not too many messages (5-10/s), they are actually quite big (500kb) and we can allow for a little message loss time to time, and duplicates don’t really cause a problem for us.

- Really? Well, we have tiny messages, way less than that so that’s great, but unfortunately our system can not tolerate message loss. Or out of order messages. Even in case of a datacenter failure we need guaranteed FIFO per producer. We can tolerate duplicates.

- Hmm…Let me see what I can do…

Not knowing yet that they are asking for very hard things, I started to investigate, and went deep, deep down the rabbit hole.

Yikes! I can lose messages? Really?

I did have distributed systems in college. I think I even had a good grade on it, but it seemingly did not stick too deep. I was happily ignorant and positive. Messaging? Easy beasy! I used JMS before, and I just had to create a consumer, producer on the other end and that’s it. Simplez.

I obviously had close to zero idea about messaging and the guarantees a messaging system can give. And there is a very good chance that there are a lot of software developers out there who have a lot catchup on this as well. I highly recommend to revisit your assumptions about the guarantees - preferrably early on in your project!

I started to explore the Internet, and I started to frown as I read that you can lose messages with RabbitMQ: In one of my first set of googlings around I stumbled upon Kyle Kingbury’s “Call me maybe…” posts including a torture of a RabbitMQ cluster and tons of references to papers about distributed failure detection, the CAP theorem, an amazing article about different levels of consistency…thanks Kyle, I learned a lot). But I started to doubt myself: did I choose the wrong technology?!

I don’t believe it, let me test it!

I couldn’t believe that there is no setup where you cannot lose messages. So I read more and more, and I started to evolve my little test program to count messages, check ordering, check lost messages…I learned about iptables and started to partition my network in weird ways.

RabbitMQ documentation is quite precise and careful, but sometimes I would expect a bit more information around what scenarios can fail and how to go about to solve them based on the use case. Their docs on partitions should include something around how to resolve conflicts when they happen without losing messages.

When a network partition happens you can choose to resolve it manually (ignore mode) or there are three automated modes to handle partitinos:

autoheal: joins partitions automatically and resets the losing partition’s state.

pause_minority: shuts down partitions smaller in size than quorum.

pause_if_all_down: RabbitMQ will automatically pause cluster nodes which cannot reach any of the listed nodes. It’s pretty similar to pause_minority but you can play around on the datacenter level a bit more. In the end you’ll have to decide between autoheal or ignore.

As proven by Kyle Kingsbury, when you use RabbitMQ clustering with any of the automated conflict resolutions, there are certain network partitioning scenarios where messages can be lost. I managed to reproduce the test with both autoheal and (a bit more surprisingly) pause_minority as well.

  • Producers: My tests were running two producers, each of them publishing 60,000 integers, from 1 to 60,000 and a message property called “account” set to A in publisher-A and B in publisher-B.

  • Consumers:
    • The two consumers were connecting to my fanout exchange as primary and secondary consumers, with predefined, ha:all mirrored, durable queues.
    • I created an artificial wait for 2 seconds in the consumers, making them slow, in order to have some backlog build up always
  • Guaranteed message delivery:
    • Publishers were using Publisher Confirms
    • Consumers were using message acknowledgement with noack=false.
    • Messages had the persistent flag set to true.
    • If there is a connection issue, nacks, shutdowns, cancellations on the AMQP channel, both my publishers and consumers reconnected endlessly
    • The connection factory was given all the three nodes.
  • Statistics: I was keeping stats on the consumers for all the 1-60,000 messages how many times I got them. If I haven’t received a number that’s considered missing as the publishers are in a loop, waiting for every single message to be confirmed.

Yes, I can lose messages.

At the core of the message loss is that RabbitMQ clusters don’t tolerate network partitons due to how Mnesia, the underlying distributed database works.

When the partition happens and the two publishers are still pushing the messages to both active partitions, that’s easy to explain, the divergence happens while the partition is active and when it heals, we’d have to merge.

The surprising one for me was the pause_minority mode, as if the nodes don’t see each other, they should not be able to get replicas of the message, so they shouldn’t ack back to my publisher and also, they should stop…and they do. It’s just the order of partitions can lead to a stale partition to become the master and when nodes join back to the stale master - RabbitMQ (to be precise the underlying Mnesia database) chooses to reset the conflicting state. It’s easier to imagine diverging partitions when every node has a publisher but I managed to reproduce this even with clients connecting through a loadbalancer always everyone to the same node, whichever the loadbalancer finds active.

To create the “right” partitioning, I used for rabbit3 something similar:

iptables -A INPUT -s ip_address_of_rabbit1 -j DROP 
iptables -A INPUT -s ip_address_of_rabbit2 -j DROP 

Which creates a firewall rule on my linux vm to drop all incoming packets from given machine. This case rabbit3 will be cut off from rabbit1 and rabbit2 but not from the clients!!!

The order would be the following:

My cluster: rabbit1, rabbit2, rabbit3 with pause_minority

  1. Publisher A connecting to rabbit1
  2. partition rabbit3 away from rabbit2 and rabbit1 (now rabbit3 is stale pretty quickly, he’s gonna be the future “stale node”, >evil laughter<.)
  3. partition rabbit1 away from rabbit2 and rabbit3 (at this point the cluster is stopped, the final two minorities are in sync but dead)
  4. partition rabbit2 away from rabbit1 and rabbit3
  5. heal rabbit3 (cluster is still stopped)
  6. heal rabbit2

At step 6 there is an element of chance: the cluster has to start up with rabbit2 joining rabbit3. There are no clients connecting yet to any of them, they are still trying to connect to rabbit1. RabbitMQ docs say:

“The winning partition is the one which has the most clients connected (or if this produces a draw, the one with the most nodes; and if that still produces a draw then one of the partitions is chosen in an unspecified way).”

The results:

17% of my messages were lost.

What kind of network partitions are the dangerous ones?

I want to stress the specificity of these partitions.

The interesting network partitions are which separate your cluster nodes but don’t cut off your publishers/consumers. If a partition cuts off your clients as well, the clients will just simply fail over to another node as shown below:

But if we the network partition is partial (as they can be!), they can actually create the inconsitencies:

How about disaster recovery?

Let’s have a look at providing DR for RabbitMQ! If we agree on async replication, we agree to message loss. So let’s start from a stronger place, we want a guaranteed replica over the WAN for every message. If the replication is unsuccessful, we want the publisher to know it, “hey, I couldn’t ensure this message to be in both datacenters, please retry!”.

Here are some patterns though if you want to go down this crazy path:

  • Send two messages to both sides with publisher confirms, and define “success” when both are confirmed. In general you’ll end up with a more reliable system when you put reliability mechanism in the application rather than the middleware. On the consumer side you’ll receive guaranteed dupes.
  • Use a SAN and spin up RabbitMQ manually? Maybe. You’ll be exposed to the weirdnesses of the SAN and AFAIK even “synchronous” SAN will turn into “async” mode when it can’t see the other side. And…you’ll have to spin up RabbitMQ manually in the other datacenter…

What if we give into a bit of message loss potentially, but we’d like to decrease the extent of it with asynchronous replication? So let’s say we relax the consistency to be eventual between the two datacenters. This case we still have a couple of options:

  • Federate the exchanges with AMQP to the secondary + add a TTL on the messages so they won’t pile up endlessly
  • Federate the exchanges with AMQP to the secondary + federate the queues across the datacenter - this will definitely duplicate the messages which you can discard on the consumer side.

Theory

The CAP theorem is talking about that when you have a network partitioning and active servers, you have to choose between availability or consistency, you won’t get both of them. In terms of consistency we mean “linearizability” and in terms of the scope, we are talking about “one register”. As “one register” you should think about one concept, one invariant, one state, it’s something you can represent with one variable in a single threaded process.

What is that “register” in a messaging system? It’s the queue!

Consistency for queues/a system with messaging

The only operation consistency can be defined on in case of messaging is whether a message has been stored or not – i.e. after the message is stored (which can be defined as “it has replicas in the active nodes” e.g.)everybody’s understanding (including the publisher which receivs an ACK, the consumer which can consume this message) is the same in the whole system.

By definition if I have only one node, consistency is satisfied. If I have two nodes, I will have to have a replica of every message “immediately” to have them consistent. Obviously replication takes time and coordination…this is where the CAP theorem is coming from.

Availability for queues/a system with messaging

We can think about availability only on the node level: a node is available for messaging when I can publish and consume arbitrary messages.

CP vs AP

So, the question is do we choose consistency, i.e. my cluster is unavailable until the network partition is resolved (two node cluster with pause_minority e.g), or availability, i.e. I might run into diverging partitions but at least I can write and read stuff, even if the view across the cluster is not consistent.

FIFO in distributed messaging

Order in distributed messaging under the presence of failures is hard. In case of RabbitMQ every kind of failover will result in potentially replaying old messages. If there was a partition and we re-merge the diverged partitions having mutually non shared messages, then even the order is violated, unless there is an explicit ordering defined. Now think about it how this ordering would work: it would have to wait until every necessary message arrived without a gap. This is possible but only sensible with a limit on waiting. Then you’ll have to decide what to do.
Until there is an implementation of this kind of technology on the broker level (might be a while, and also, be suspicious with commercial products promising it), we are left with client side ordering if it really matters. The best way to handle it is being able to deal with out of order messages, design your system to be reliable, don’t leave the guarantees to the broker.

Manual resolution of conflict

I came up with a solution. If I could have convinced people about a non-clustered setup, I would have went with a single node, but no one wanted to accept the single point of failure, oh well, maybe I was too transparent, now here we are ;)

I’ve never seen this written down before so I’ll share what I came up with to handle partitions. I decided to go with a two node cluster simply because it’s simpler to resolve conflict between two partitions. There is a loadbalancer in front of the cluster, which ensures that every client connects to the same node at one time.

WARNING: there are a couple of very limiting assumptions to this method, failing on any of these will result in losing messages:

  • Assumption 1: all your consumers are up and running during the resolution
  • Assumption 2: you can’t have any other restarts than described here
  • Assumption 3: your clients are not creating ad-hoc exchanges/queues/bindings/policies on the broker.
  • Assumption 4: you have a VIP/Loadbalancer in front of your two nodes which can channel every client to the same node always, and fails over between the two nodes.

The partition resolution mode (cluster_partition_handling config parameter) is ignore.

Let’s say that we just recovered from a bigger network disturbance and now rabbitMQ has the big red message stating that there is a conlicting state, rabbit2 was partitioned away from rabbit1. (It’s relatively easy to get RabbitMQ into this state with the right order of partitioning).

Step 1 - Recreate the partition

Yes, we have to recreate the network partitioning so that we can see the differences between the partitions. It’s ok to do it in either node. The key thing is to only cut the two nodes away from each other, the outer world should be able to talk to them!!

Wait 60 seconds, or as much as it’s required to see the failover, and the management UI to come back.

RABBIT 1 can’t see RABBIT 2, so you can separately manage the nodes

Step 2 - Stop all publishers and drain queues

Identify which node are your clients connected to (see Assumption 4), let’s say it’s RABBIT 2

If you set the memory high watermark to 0 in RabbitMQ, it will stop all publishers! This enables the queues to drain. Wait until all queues are drained.

Result: RABBIT 2 has no messages left, but everybody is connected to it. The publishers are being blocked, for them the node is slowing down.

Step 3 - Fail over to the other node

Now that the queues are drained, it’s safe to stop this node. Stop it entirely. Wait until everyone failed over nicely to the other node.

Result: RABBIT 2 is stopped, everybody reconnected to RABBIT 1

Step 4 - Restart the node.

You can heal the partition now and restart the node. It will join the running node with all the clients and lose its state, but it’s fine, it had nothing interesting since we drained all the queues.

Result: RABBIT 2 joins back the cluster without any issues, and everything’s back to normal. :)

Is this the end of RabbitMQ?

No. Sh*t happens. There is a point where every system can break. Before that though there are always signs, which we can monitor and react step by step based on our confidence level. The chance for these weird scenarios are non zero, so you should think about them. If there are solutions in each scenario (besides when both datacenters went down unrecoverably) where you can recover your messages, then the only thing you should do is reduce the possibility of the slow, manual scenarios.

It would be great if there would be a map of states visualizing the probabilities of losing a message, and the probability of each transitition to a different state. That is a fun probability calculation task and would be very useful for operations as well as educationally. It would be obviously parametrizable, starting with your network reliability as a parameter, chances for a hardware failure, etc. If I have time one day…

Conclusion

You can use RabbitMQ with HA, if you have a plan to remediate network partition when it happens either automatically (and thus risking losing messages) or manually to save messages under some strong assumptions, although they might be out of order and repeated (which is normal).

Understand your risks

Think about every scenario, what should be done. And test them. Try them out. Use jepsen, write your own tool, whatever. Make sure you test everything before you put mission critical applications on your message bus.

My wishlist for RabbitMQ would be:

  1. Merge strategy which doesn’t delete the loser partition: It would be great if we could have an automated union of messages when joining partitions. Even if don’t handle object merging in the first attempt.
  2. Separate statistics DB for partitions: The statistics db is shared across nodes even when the network healed after a partition resulting in a conflict. This is very confusing, it’s impossible to gauge the situation because both nodes show the same stats.

I have to note that besides the above tortures, I never had any issues with RabbitMQ, it’s very stable. The nodes went down only when I turned them off or the host went down. No memory leaks or other issues. I found no bugs. The docs are written really precisely, and I’m very grateful that I can work with high quality open source products, test them and read and write about them.

What is my decision?

RabbitMQ’s scalability, advanced routing, flexible LDAP plugin and stability makes me confident that I can go live with the first project. If in the next couple of use cases I don’t see anything crazy, I will keep it up and running. In the meantime, I’ll have a look at other products, just to have a deeper understanding how they work, but if I find something with similar capabilities without the manual conflict resolution, I might switch, as the thought of having to wake up at 3 AM to resolve RabbitMQ network partition is not comforting at all. If nothing is way better, then I might automate the fix.

Please do comment and let me know what you think!

Jepsen and InfluxDB, Chapter II. Where is InfluxDB on the CAP scale?

This is a continuation of [the hacking to get Jepsen work with InfluxDB](/distributed_systems/Hacking-up-a-testing-environment-for-jepsen...… Continue reading

How to prove that your parallel code works?

Published on December 13, 2015