Immutability and Co/Contravariance

After a long long time…

Immutability also Empowers Type Safety.

There are many reasons to use immutable data structures and you may have heard many of them. For example, immutable data is safe to be shared across multiple threads etc. Apart from all of these awesome benefits, immutability helps us to employ better type systems. Here is an example:

In Java, Lists are not covariant. What does that mean? Long story short, List<Cat> is not subtype of List<Animal>. If they were, the following code would bark at runtime:

class Animal {}
class Cat extends Animal {}
class Dog extends Animal {}

List cats = new ArrayList()
List animals = cats;
animals.add(new Dog());
Cat persian = cats.get(0); // DOG IS NOT CAT!!!!

Lists (and other containers) not being covariant makes our lives hard. For example, if I have a function that accept List<Animal>, I cannot pass List<Cat> to it. I have to use wildcard and I lose type safety.

In contrast, in Scala, Lists are covariant. So, the following code will compile:

val cats: List[Cat] = List(new Cat(), new Cat())
val animals: List[Animal] = cats

Why is Scala too brave to let you do that? Because you cannot change animals after declaring it and that is why, in Scala (and Kotlin), only immutable collections are covariant. So you cannot add any Dog to animals! In other word, immutable list does not expose any function that consumes an instance of T. if it did, the following case would happen:

class List[+T] {
  def consume(a: T, int index): Unit { 
     underlying[index] = a 
  }
}

val cats = List[Cat] = List(new Cat(), new Cat())
val animals: List[Animal] = cats
animals.consume(new Dog, 0) // change the first cat to a dog!!!

Instead, instances of T are always produced by List member functions. Fancy speaking, instances of T are always used in covariant position and not in contravariant position. Even if you want to define your covariant typed class where there is a member that consumes an instance of type parameter, it will fail to compile.

class MyClass[+A] {
   def set(a: A): Unit = {} // fails to compile as an instance of A is used in a contravariant position
}

class MyClass[+A] {
   def get(): A = ... // it compiles, type parameter is used in covariant position
}

Kotlin has a similar approach as Scala, but what I like more in Kotlin (only in this very specific case) is that, to define covariant or contravariant classes, instead of using + and -, it uses out and in which clearly emphasize on producing and consuming.

Advertisement

Building a Simple and Performant DSL in Scala

Scala provides different language constructs by which a wide variety of DSLs can be made. Implicits, operators as name of functions, macros, higher order functions etc. are examples of those constructs. However, building a DSL is not a trivial task especially when performance matters. In this blog post, I am going to talk about a simple DSL that I built in Scala to convert objects to a sequence of bytes. As a use case, the resulting byte array can be used to create a hash (e.g. MD5 or SHA) for a piece of data. The DSL’s aim is to be concise and efficient.
Here is a Java way to create a byte array out of a sequence of different values:

final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos);

dos.writeChars("DSL");
dos.writeInt(32);
dos.writeBoolean(true);

dos.close();

final byte[] result = baos.toByteArray();

By running this code, result will contain 10 bytes, out of which 5 bytes to represent str, 4 bytes for i and one single byte for b. In standard UTF-8 encoding, str will encode to 3 bytes but writeUTF creates modified UTF-8 where it starts with preceding two bytes stating the length of the byte sequence. As it is Java, the code has lots of boilerplate but it is quite performant. Obviously, if we come up with a concise DSL to eliminate the boilerplates while preserving the performance, we will achieve our goal. So let’s define the desired DSL syntax first.

Usually APIs become easier to use if they provide meaningful method chaining. Moreover, Scala lets us use operators as the name of functions. This means that we can define DSLs which support operation chaining. I would like to chain ~ operator to create an array of byte representing all the values that formed the chain. Therefore, the above code can be concisely implemented as follows:

val result: Array[Byte] = "DSL" ~ 32 ~ true

To preserve performance, we can keep using DataOutputStream underneath. Thus, we need to create its instance together with an instance of ByteArrayOutputStream as its underlying output stream in the beginning of the flow. Obviously, the DataOutputStream instance should be closed and the bytes should be retrieved from the ByteArrayOutputStream instance at the end of the flow. For simplicity, I add unary operation ~| to our DSLs to show the end of the flow:

val result = "DSL" ~ 32 ~ true ~|

Now we have to find out how it is possible to build this DSL. First, let’s try to break our DSL into small pieces. Here are couple of observations:

  1. Since we want to be able to invoke ~ operator on any object, we have to extend all types to support ~.
  2. At the beginning of the chain, ~ is an operation defined on String and accepts instances of any type as its argument. As mentioned in the previous point, this operator is not just defined on String but on all types. In our example, the first ~ invocation can be desugared to "DSL".~(32).
  3. This operation produces an instance of a type which:
    • Supports ~ operation which accepts instances of any type as its argument. In our example, it accepts true.
    • Carries and mutates DataOutputStream and ByteArrayOutputStream created at the beginning of the flow.
    • Supports operation ~| which closes DataOutputStream instance and retrieves bytes from ByteArrayOutputStream instance.

As seen, I differentiated between the first ~ and the rest. The first one is defined on all types and produces an instance of a special type (let’s say Container). However, other ~ operators, are defined on Container and produce Container.

First, I start with defining Container. As mentioned before, Container is responsible for carrying and mutating DataOutputStream and ByteArrayOutputStream. Moreover, it should support ~ and ~|. The implementation of ~ will come later. The complete source code can be found here.

case class Container(dos: DataOutputStream, baos: ByteArrayOutputStream) {
    // TODO: def ~

    def ~| : Array[Byte] = {
        dos.close()
        baos.toByteArray
    }
}

object Container {
    def apply() = {
        val baos = new ByteArrayOutputStream()
        val dos = new DataOutputStream(baos)
        new Container(dos, baos)
    }
}

Now, we need to provide a mechanism to abstract writing different values into DataOutputStream. Different DataOutputStream methods should be invoked to write different data types into the underlying output stream (e.g. writeInt, writeBoolean etc.). Scala type classes are a perfect way to abstract these differences while keeping extensibility. Take a look at the following code snippet:


trait ByteSequenceRepr[A] {
    def writer: (Container, A) => Unit

    def toByteSequence(arg: A): Container = {
        val baos = new ByteArrayOutputStream()
        val dos = new DataOutputStream(baos)
        toByteSequence(Container(dos, baos), arg)
    }

    def toByteSequence(container: Container, arg: A): Container = {
        writer(container, arg)
        container
    }
}

ByteSequenceRepr is the actual type class which accepts a type parameter. It provides a writer higher order function which yields a function to write instances of type A into DataOutputStream embedded in the Container instance. Additionally, our type class has two other functions which actually mutate (or create and mutate) the Container instance using available writer. Later, I will explain how this type class is going to help us.

Now, let’s implement the missing ~ operator on Container case class:


...

def ~[A](arg: A)(implicit bsr: ByteSequenceRepr[A]) = bsr.toByteSequence(this, arg)
...

~ accepts an argument of type A and it needs an instance of ByteSequenceRepr of type A available in the scope. The logic is quite simple; using the available ByteSeqeunceRepr, the given instance of A is written into DataOutputStream carried by Container instance. Of course, you may rewrite this function using context bound and implicitly mechanism.
To make the life of DSL’s users even easier, we can provide default ByteSequenceRepr implicits for primitive data types. I created LowPriorityDefaultByteSequenceReprImplicits trait and put different ByteSequenceRepr there (the complete version is here):


trait LowPriorityDefaultByteSequenceReprImplicits {
    implicit val intToByteSequence: ByteSequenceRepr[Int] =
    new ByteSequenceRepr[Int] {
        val writer: (Container, Int) => Unit = _.dos.writeInt(_)
    }

    implicit val stringToByteSequence: ByteSequenceRepr[String] =
    new ByteSequenceRepr[String] {
        val writer: (Container, String) => Unit = _.dos.writeChars(_)
    }

    implicit val booleanToByteSequence: ByteSequenceRepr[Boolean] =
    new ByteSequenceRepr[Boolean] {
        val writer: (Container, Boolean) => Unit = _.dos.writeBoolean(_)
    }
}

object ByteSequenceRepr extends LowPriorityDefaultByteSequenceReprImplicits

Implicits have been defined in ByteSequenceRepr companion object. In this way, the default implicits will have the lowest priority when they are being looked up. Thus, for example, if the user wants to provide a new implementation for string ByteSequenceRepr, she/he just needs to add a new implicit in the scope and that one will have the higher priority than the default one. Here, you can find a detailed explanation of implicits finding rules.

As mentioned before, in addition to Container which supports ~ and ~| operator, all data types should also support them. Implicit function and implicit class are two mechanisms in Scala to extend the existing APIs without introducing new inherited data types. It is known as “Pimp my library” pattern:


object Implicits {
    implicit class WithTilde[A](val left: A) extends AnyVal {
        def ~[B](right: B)(implicit seqA: ByteSequenceRepr[A], seqB: ByteSequenceRepr[B]): Container = {
            val container = seqA.toByteSequence(left)
            seqB.toByteSequence(container, right)
        }

        def ~|(implicit seqA: ByteSequenceRepr[A]): Array[Byte] = seqA.toByteSequence(left).~|
    }
}

Let’s have another look to our example:

val result = "DSL" ~ 32 ~ true ~|

The first invocation of ~ is on "DSL" and the argument of this call is 32. Therefore, first, the Container instance should be created having "DSL" written into its DataOutputStream field. Then it should be mutated by writing 32 in it. This happens by calling two different overloads of toByteSequence on available ByteSequenceRepr instances in WithTilde ~ implementation. ~ yields a Container instance, so all the subsequent ~ invocations (i.e. ~ true) will be done on that resulting container. WithTilde implicit class extends AnyVal so a new instance of WithTilde class is not created every time that the conversion needed. Moreover Container instance is mutated and passed through instead of being re-created every time so the heap size is not increased drastically.

Actually, we are done. By importing Implicits._, we can benefit from our concise DSL. We can extend it easily for any composite data type as well. You just need to provide a ByteSequenceRepr instance of that type in the scope:


case class Point(x: Int, y: Int)

implicit val pointToByteSequence: ByteSequenceRepr[Point] =
new ByteSequenceRepr[Point] {
    val writer: (Container, Point) => Unit = { (container, point) =&gt;
        container.dos.writeInt(point.x)
        container.dos.writeInt(point.y)
    }
}

val result = Point(12, 15) ~ "Hello" ~ 127 ~ 123L ~ true ~|

Our DSL has a very small memory usage overhead comparing to the pure java approach due to intermediate Container objects that being created. However, since for each chain, we instantiate Container just once, it is negligible.

We can go even further by using shapeless to define a general ByteSequenceRepr to convert any case class to a sequence of bytes. The basic idea is that shapeless provides HList data type to model heterogenous lists. Additionally, shapeless supports a mechanism to implicitly convert any case class to a HList. So, if you provide an implicit ByteSequenceRepr for HList, you can use it to implicitly convert any case class to an array of bytes. By having that, you do not need to provide an implicit ByteSequenceRepr for Point case class in the above example. Of course, it is not without cost and because of those intermediate implicit conversions, the memory consumption will be increased. The complete source code of using shapeless can be also found here. I am not going more into details of shapeless but you may get the basic idea by reading this.

Drawback

Although the DSL is concise and performant, it is not fold friendly. If there is a list containing different objects and the goal is to create a single byte array out of all of them, we have to do it as follows:


val list = List("DSL", 32, true)

val result = list.foldLeft(Container())(_ ~ _) ~|

So it leaks some underlying structures (Container) which is not ideal.

Conclusion

Building a custom DSL should be done carefully especially from the memory management point of view. If you are using different internal DSLs in your server side application, under the load, the sum of the memory usage of them may negatively impact the server performance. Moreover, although we need to keep immutability, sometimes to improve performance, we may use a mutable data structure underneath but that should be encapsulated properly.

Scala Streams: A Deeper Look

To start, let’s take a look at the following code:

def process(xs: => Traversable[Int]) = xs.map(_ + 10).filter(_ % 3 == 0).map(_ * 2)

process function is just a chain of transformations on the given Traversable. Each of these transformations will create an intermediate traversable to be passed to the next transformation function. Given xs = Array(1, 2, 5), the manual tracing output would be:

1 is increased by 10 so transferred to 11
2 is increased by 10 so transferred to 12
5 is increased by 10 so transferred to 15
11 is filtered out because it is not a multiple of 3
12 is kept because it is a multiple of 3
15 is kept because it is a multiple of 3
12 is multiplied by 2 so transferred to 24
15 is multiplied by 2 so transferred to 30

In imperative programming style, this computation can be done “easily” with a while loop which transform each element of xs to the final desired value in one go. So no intermediate traversable will be created and this means less memory allocation. In this case, the manual tracing output would be:

1 is increased by 10 so transferred to 11
11 is filtered out because it is not a multiple of 3
2 is increased by 10 so transferred to 12
12 is kept because it is a multiple of 3
12 is multiplied by 2 so transferred to 24
5 is increased by 10 so transferred to 15
15 is kept because it is a multiple of 3
15 is multiplied by 2 so transferred to 30

How can we keep the compositional style (composing higher order functions like map, filter etc.) but without creating intermediate results to use memory more efficiently?

This can be achieved by using non-strict data structures. Non-strict (or lazy) data structures defer a computation until it is needed. In Scala, Stream is an implementation of non-strict sequence. Here is a simplified version of Stream implementation:

trait Stream[+A]
case class Cons[+A](head: A, tail: () => Stream[A]) extends Stream[A]
case object Empty extends Stream[Nothing]  

As seen above, the main difference between Stream and other strict sequences such as List is that the tail is a function instead of a strict value. So, tail is not being evaluated until it is forced. Let’s play with our Stream data structure to see how it works.

First, let’s create a convenient apply function

object Stream {
   def apply[A](xs: A*): Stream[A] = if (xs.isEmpty) Empty else Cons(xs.head, () => apply(xs.tail:_*))
}

Now let’s add a simple version of map and filter to our Stream trait.

trait Stream[+A] {
   def map[B](f: A => B): Stream[B] = this match {
      case Empty => Empty
      case Cons(h, t) => Cons(f(h), () => t().map(f))
   }

   def filter(f: A => Boolean): Stream[A] = this match {
      case Empty => Empty
      case Cons(h, t) =>
         if (f(h)) 
            Cons(h, () => t().filter(f)) 
         else 
            t().filter(f)
  }
}

Now, let’s partially trace our motivating example:

Stream(1, 2, 5).map(_ + 10).filter(_ % 3 == 0).map(_ * 2)

Upon the invocation of the first map, Stream(1, 2, 5) is matched with the second case (line 4) so Cons(11, function0) is created. function0 notation denotes on a function which if it is invoked, it calls map(_ + 10) on the tail of the original stream. Next, filter function is invoked on the result of the first step which was Cons(11, function0). So it is matched with the second case (line 9). Since 11 is not a multiple of 3, it should be filtered out so else branch is executed. Therefore, function0 is invoked to be evaluated. So we enter again in the map function and again we match to the second case (line 4) and Cons(12, function1) is created. Upon executing filter on this intermediate result, it is again matched to the second case (line 9) but this time, since 12 is a multiple of 3, it is not filtered out (line 11). So Cons(12, function2) is created. Now, map(_ * 2) will be executed on this intermediate result and so on so forth.

As you see:

  • Intermediate results are type of Stream which only has its head evaluated and the rest is not evaluated.
  • The order of the transformations is the same as while loop.

This is why sometimes streams are referred as first-class loops [1]. It seems that we should use this first-class loop instead of strict collections whenever we have a chain of transformations. Is it true?

To answer this question, I benchmarked Scala standard Stream against Array from both memory and throughput aspects. The benchmark source code can be found here.

For memory usage comparison, I implemented MemoryUsage application and monitored the memory usage of different approaches with VisualVM.

I increased the size of data to 50 million elements to make the result more visible in VisualVM. Here is the result:

Streaming vs Collection

As seen, applying the aforementioned chain of transformations on Array needs considerably bigger heap size. However, when the size of data is smaller, the differences is less tangible. But memory usage is just one side of the coin. What about throughput?

Stream data structure heavily uses memoizing. This means that functions that are passed to the constructor are cached in lazy vals to prevent being re-computed. Although lazy values have been designed exactly for this purpose, comparing to vals, they have worse performance. So we can expect that the throughput of stream transformations is less than arrays (or generally strict collections). To experiment this, I did a micro-benchmark using jmh and sbt-jmh. I chose Array and Stream with 10000 elements with the following benchmark setup:

sbt "run -i 5 -wi 5 -f1 -t1 \"StreamingBenchmark\""

And here is the result:

[info]
[info] # Run complete. Total time: 00:00:20
[info]
[info] Benchmark                             Mode  Cnt     Score     Error  Units
[info] StreamingBenchmark.streaming         thrpt    5   880.009 ±  75.244  ops/s
[info] StreamingBenchmark.strictProcessing  thrpt    5  4078.819 ± 276.049  ops/s

As seen, given the previously mentioned transformation chain, Stream throughput is around 4 times less than Array throughput.

Conclusion

Although Streams eliminate intermediate collections upon transformations, it has considerably less throughput comparing Array (of course Array is optimised for traversing but I also executed the benchmark against other strict data structures like Vector and List and in all cases Stream has worse performance but with different ratios). Stream and other non-strict data types are great data structures but they should be used when their characteristics are really needed.

[1]. Chiusano P. and Bjarnason R. (2015). Functional Programming in Scala by Manning

Scala Futures with Timeout

Scala Futures do not support built-in timeout unless you block on the future using scala.concurrent.Await as follows:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

lazy val f = future { Thread.sleep(2000); true }
Await.result(f, 1 second)

The above code will timeout:

java.util.concurrent.TimeoutException: Futures timed out after [1 second]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
...

Sometimes non-blocking external libraries are providing futures as their functions return type but you cannot provide any timeout for those functions. Imagine you may want to ping a database service but you do not want to wait forever to receive the ping result. For example scalapenos client library for Riak provides a ping function returning Future[Boolean] but you cannot provide any timeout for that function. Since it uses Riak HTTP APIs and connect to those API using spray-client pipelining, it relies on the default sendReceive timeout which is 60 seconds.

But as you might have heard a lot, blocking on a future is not recommended because it wastes a thread. Instead, a non-blocking approach to provide a timeout to a future could be achieved using akka after pattern:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.actor.ActorSystem
import akka.pattern.after

val system = ActorSystem("theSystem")

lazy val f = future { Thread.sleep(2000); true }
lazy val t = after(duration = 1 second, using = system.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

val fWithTimeout = Future firstCompletedOf Seq(f, t)

fWithTimeout.onComplete {
   case Success(x) => println(x)
   case Failure(error) => println(error)
}

The logic is quite self-explanatory. According to akka documentation, after returns a future that will be completed with the success or failure of the provided value after the specified duration. So, in the above example, t will fail after 1 second with TimeoutException. Therefore, fWithTimeout will be a future to the result of t if it completes faster than f meaning that f has not completed within the expected time.

Notice that in order to have precise timing for the future execution, the computation of the provided future to after function should not have started in advance. For example, in the above code, if you want to extract

Future.failed(new TimeoutException("Future timed out!"))

to a variable, you have to define it as a lazy val and not a val. Passing the future to after inline as I did is also fine because the parameter is a by-name parameter.

The downside of this approach is its dependency on akka. Here you will find a lower level approach using pure scala (plus Netty HashedWheelTimer scheduler) where actually the after pattern has been mimicked.

In order to have easier to use timeout enabled futures, we can simply extend scala futures to support timeout by using implicit class (“Pimp my Library” pattern):


import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import akka.pattern.after

implicit class FutureExtensions[T](f: Future[T]) {
  def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
      Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
  }
}

Now we can enable timeout on any future easily:


import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem

implicit val system = ActorSystem("theSystem")
implicit val timeout = 1 second

lazy val f = future { Thread.sleep(2000); true }

f withTimeout new TimeoutException("Future timed out!") onComplete {
  case Success(x) => println(x)
  case Failure(error) => println(error)
}

As I mentioned earlier the above solution fits akka actor systems perfectly.

RabbitMQ Exchange Federation in Multiple AWS Availability Zones

Federation is one of the ways by which a software system can benefit from having multiple RabbitMQ brokers distributed on different machines. Clustering and shovel are two other ways to provide distributed brokers. Choosing between these approaches completely depends on your context and what you want to achieve. In this blog post, I am going to focus on exchange federation feature of RabbitMQ to achieve highly available messaging system spread in multiple AWS availability zones. Whether this is a right choice or not will be explained at the very end.

RabbitMQ provides two types of federation: exchange and queue. The focus here is the former. Imagine the goal is to have three RabbitMQ instances spread in three different AWS availability zones located in AWS Ireland region. So, if any of these availability zones has connectivity problem or temporarily goes down, the messaging system continues working. Let’s think about a scenario where there is a direct exchange to which different publishers can publish messages. So each message, based on its routing key, will be delivered to one or more queues. In the distributed brokers architecture, it should be possible that a message is published to an exchange on one node and be received on a bound queue on another node. The exchange federation makes this possible by defining other nodes as upstream of the current node. So if nodes ‘a’, ‘b’ and ‘c’ construct the RabbitMQ fleet, by using exchange federation, it is possible to define ‘b’ and ‘c’ as the upstream of node ‘a’. So any messages published to the RabbitMQ exchange located on ‘b’ and ‘c’, will be replicated on the corresponding exchange on node ‘a’.

I assume that you already setup your RabbitMQ instances on AWS and defined a load balancer in front of them. A good instruction to do that can be found here. If you want to achieve high availability, setting up a load balancer in front of the RabbitMQ instances is a good practice because it makes brokers distribution transparent from the message publishers.

Now it is time to setup the exchange federation between RabbitMQ nodes. Federation is a RabbitMQ plugin coming with the RabbitMQ distribution. For this experiment, I installed rabbitmq-server-3.3.4-1 on EC2 instances with Amazon Linux AMI.

The federation plugin is not enabled by default, so the first step is to enable it.

rabbitmq-plugins enable rabbitmq_federation

RabbitMQ server needs to be restarted after enabling federation plugin to make it take effect. Afterwards, the upstream links can be defined to form the federation topology. The federation topology can vary based on the purpose. It can be ring, fan-out, complete graph etc. More details about different federation topology can be found here. Since we want to achieve completely transparent highly available RabbitMQ fleet, complete graph federation topology should be set up. Imagine the RabbitMQ fleet consists of the following 3 instances:

  1. ip-172-30-5-106
  2. ip-172-30-4-187
  3. ip-172-30-3-13

Since the upstream links are one-directional, 6 upstream links are needed to form a complete graph between these nodes. In order to define nodes ‘b’ and ‘c’ as upstream nodes of node ‘a’, assuming that RabbitMQ is running on port 5672, two new federation upstream parameters should be defined on node ‘a’ as follows:

rabbitmqctl set_parameter federation-upstream upstreamB '{"uri":"amqp://ip-172-30-4-187:5672","max-hops":1}'

rabbitmqctl set_parameter federation-upstream upstreamC '{"uri":"amqp://ip-172-30-3-13:5672","max-hops":1}'

By executing above commands, two new parameters called upstreamB and upstreamC for the federation-upstream component will be defined, each of which defining the upstream URI with max-hops set to 1. By setting max-hops to 1, the downstream nodes cannot replicate the messages that they receive from upstream nodes on their own downstream nodes. In complete graph topology, max-hops should be set to 1; otherwise, nodes can receive back the messages that previously they themselves replicated on their downstream nodes. On the other hand, in the ring topology, assuming that there are N nodes, max-hops should be set to N-1 to let all nodes have all messages.

Note that, by defining upstreams, we let brokers connect to each other remotely. Since by default it is not possible to connect to a broker remotely via ‘guest’ user, the RabbitMQ configuration should be altered to open up the ‘guest’ user remote access. This can be achieved by setting loopback_users configuration item to []:

[{rabbit, [{loopback_users, []}]}]

Above approach to let RabbitMQ nodes access each other is safe if nodes are located in private subnets inside VPC. If nodes are accessible via internet, you need to use a more secure approach. For example you may need to configure specific RabbitMQ users with specific access rights.

The federation upstream parameters should be defined in node ‘b’ and ‘c’ in the same way as we have defined them in node ‘a’ to form the complete graph. After that, we should determine that we are interested in applying federation at the exchange level (and not at the queue level):

rabbitmqctl set_policy --apply-to exchanges federate-me "MyExchange" '{"federation-upstream-set":"all"}'

The above command defines a policy called “federate-me” which applies the federation on all the exchanges whose name is “MyExchange”. The name of exchanges can be a regular expression so it may include all the exchanges with the name that matches that regular expression. As you can see, the federation-upstream-set is set to ‘all’. This means that, we want to apply this policy to all upstream nodes that we have previously defined for the current node. You may also group the upstream nodes and define different policies for different groups by setting ‘federation-upstream-set’ parameter to that group.

Again, you need to execute the above set-policy command on all nodes. After that, the federation setup is complete. You can check the federation status using RabbitMQ command line tool as follows:

sudo rabbitmqctl eval 'rabbit_federation_status:status().'

Federation vs Clustering in Cloud Environments

Performance

As mentioned here, performance tests using PerfTest shows that the federation throughput is considerably lower than the clustering throughput. The reason is that, federation works at the higher abstraction level than clustering.

Reliability

RabbitMQ clustering does not tolerate network partitions well so it is recommended to be used in cases where brokers are connected via reliable LAN links. Whether the connections between AZs are reliable or not is debatable. Here it has been mentioned that clustering is fine where the brokers are distributed across multiple AZs. In my personal experience, in a production system running for 6 months having a RabbitMQ cluster of 3 nodes distributed in 3 AZs, we have observed once that the cluster is partitioned due to connection loss between AZs. If in the context of your application this level of unreliability is not acceptable, using federation is recommended. In case of using clustering, the monitoring system should immediately capture that the cluster is partitioned and recovery can be done automatically by re-joining the nodes to the cluster.

Setting Up

Setting up a federated RabbitMQ fleet is more complicated than a RabbitMQ cluster. Specifically upon scaling up, the existing nodes should be re-configured to have the newly added node as their new upstream node. This means that setting up an auto scaling group for a federated RabbitMQ fleet is not a trivial task.

Summary

Federation is a RabbitMQ plugin in order to setup distributed brokers. Generally, in reliable networks, clustering is preferred due to higher throughput and easier setup. Although connection links between AWS AZs are not as reliable as network connections inside a single AZ, by having appropriate monitoring system in-place, clustering is preferred over federation. But it should be mentioned that this still depends on your application context and the level of unreliability that your application can accept.