Scala Futures with Timeout

Scala Futures do not support built-in timeout unless you block on the future using scala.concurrent.Await as follows:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

lazy val f = future { Thread.sleep(2000); true }
Await.result(f, 1 second)

The above code will timeout:

java.util.concurrent.TimeoutException: Futures timed out after [1 second]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
...

Sometimes non-blocking external libraries are providing futures as their functions return type but you cannot provide any timeout for those functions. Imagine you may want to ping a database service but you do not want to wait forever to receive the ping result. For example scalapenos client library for Riak provides a ping function returning Future[Boolean] but you cannot provide any timeout for that function. Since it uses Riak HTTP APIs and connect to those API using spray-client pipelining, it relies on the default sendReceive timeout which is 60 seconds.

But as you might have heard a lot, blocking on a future is not recommended because it wastes a thread. Instead, a non-blocking approach to provide a timeout to a future could be achieved using akka after pattern:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.actor.ActorSystem
import akka.pattern.after

val system = ActorSystem("theSystem")

lazy val f = future { Thread.sleep(2000); true }
lazy val t = after(duration = 1 second, using = system.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

val fWithTimeout = Future firstCompletedOf Seq(f, t)

fWithTimeout.onComplete {
   case Success(x) => println(x)
   case Failure(error) => println(error)
}

The logic is quite self-explanatory. According to akka documentation, after returns a future that will be completed with the success or failure of the provided value after the specified duration. So, in the above example, t will fail after 1 second with TimeoutException. Therefore, fWithTimeout will be a future to the result of t if it completes faster than f meaning that f has not completed within the expected time.

Notice that in order to have precise timing for the future execution, the computation of the provided future to after function should not have started in advance. For example, in the above code, if you want to extract

Future.failed(new TimeoutException("Future timed out!"))

to a variable, you have to define it as a lazy val and not a val. Passing the future to after inline as I did is also fine because the parameter is a by-name parameter.

The downside of this approach is its dependency on akka. Here you will find a lower level approach using pure scala (plus Netty HashedWheelTimer scheduler) where actually the after pattern has been mimicked.

In order to have easier to use timeout enabled futures, we can simply extend scala futures to support timeout by using implicit class (“Pimp my Library” pattern):


import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import akka.pattern.after

implicit class FutureExtensions[T](f: Future[T]) {
  def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
      Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
  }
}

Now we can enable timeout on any future easily:


import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem

implicit val system = ActorSystem("theSystem")
implicit val timeout = 1 second

lazy val f = future { Thread.sleep(2000); true }

f withTimeout new TimeoutException("Future timed out!") onComplete {
  case Success(x) => println(x)
  case Failure(error) => println(error)
}

As I mentioned earlier the above solution fits akka actor systems perfectly.

Advertisements

RabbitMQ Exchange Federation in Multiple AWS Availability Zones

Federation is one of the ways by which a software system can benefit from having multiple RabbitMQ brokers distributed on different machines. Clustering and shovel are two other ways to provide distributed brokers. Choosing between these approaches completely depends on your context and what you want to achieve. In this blog post, I am going to focus on exchange federation feature of RabbitMQ to achieve highly available messaging system spread in multiple AWS availability zones. Whether this is a right choice or not will be explained at the very end.

RabbitMQ provides two types of federation: exchange and queue. The focus here is the former. Imagine the goal is to have three RabbitMQ instances spread in three different AWS availability zones located in AWS Ireland region. So, if any of these availability zones has connectivity problem or temporarily goes down, the messaging system continues working. Let’s think about a scenario where there is a direct exchange to which different publishers can publish messages. So each message, based on its routing key, will be delivered to one or more queues. In the distributed brokers architecture, it should be possible that a message is published to an exchange on one node and be received on a bound queue on another node. The exchange federation makes this possible by defining other nodes as upstream of the current node. So if nodes ‘a’, ‘b’ and ‘c’ construct the RabbitMQ fleet, by using exchange federation, it is possible to define ‘b’ and ‘c’ as the upstream of node ‘a’. So any messages published to the RabbitMQ exchange located on ‘b’ and ‘c’, will be replicated on the corresponding exchange on node ‘a’.

I assume that you already setup your RabbitMQ instances on AWS and defined a load balancer in front of them. A good instruction to do that can be found here. If you want to achieve high availability, setting up a load balancer in front of the RabbitMQ instances is a good practice because it makes brokers distribution transparent from the message publishers.

Now it is time to setup the exchange federation between RabbitMQ nodes. Federation is a RabbitMQ plugin coming with the RabbitMQ distribution. For this experiment, I installed rabbitmq-server-3.3.4-1 on EC2 instances with Amazon Linux AMI.

The federation plugin is not enabled by default, so the first step is to enable it.

rabbitmq-plugins enable rabbitmq_federation

RabbitMQ server needs to be restarted after enabling federation plugin to make it take effect. Afterwards, the upstream links can be defined to form the federation topology. The federation topology can vary based on the purpose. It can be ring, fan-out, complete graph etc. More details about different federation topology can be found here. Since we want to achieve completely transparent highly available RabbitMQ fleet, complete graph federation topology should be set up. Imagine the RabbitMQ fleet consists of the following 3 instances:

  1. ip-172-30-5-106
  2. ip-172-30-4-187
  3. ip-172-30-3-13

Since the upstream links are one-directional, 6 upstream links are needed to form a complete graph between these nodes. In order to define nodes ‘b’ and ‘c’ as upstream nodes of node ‘a’, assuming that RabbitMQ is running on port 5672, two new federation upstream parameters should be defined on node ‘a’ as follows:

rabbitmqctl set_parameter federation-upstream upstreamB '{"uri":"amqp://ip-172-30-4-187:5672","max-hops":1}'

rabbitmqctl set_parameter federation-upstream upstreamC '{"uri":"amqp://ip-172-30-3-13:5672","max-hops":1}'

By executing above commands, two new parameters called upstreamB and upstreamC for the federation-upstream component will be defined, each of which defining the upstream URI with max-hops set to 1. By setting max-hops to 1, the downstream nodes cannot replicate the messages that they receive from upstream nodes on their own downstream nodes. In complete graph topology, max-hops should be set to 1; otherwise, nodes can receive back the messages that previously they themselves replicated on their downstream nodes. On the other hand, in the ring topology, assuming that there are N nodes, max-hops should be set to N-1 to let all nodes have all messages.

Note that, by defining upstreams, we let brokers connect to each other remotely. Since by default it is not possible to connect to a broker remotely via ‘guest’ user, the RabbitMQ configuration should be altered to open up the ‘guest’ user remote access. This can be achieved by setting loopback_users configuration item to []:

[{rabbit, [{loopback_users, []}]}]

Above approach to let RabbitMQ nodes access each other is safe if nodes are located in private subnets inside VPC. If nodes are accessible via internet, you need to use a more secure approach. For example you may need to configure specific RabbitMQ users with specific access rights.

The federation upstream parameters should be defined in node ‘b’ and ‘c’ in the same way as we have defined them in node ‘a’ to form the complete graph. After that, we should determine that we are interested in applying federation at the exchange level (and not at the queue level):

rabbitmqctl set_policy --apply-to exchanges federate-me "MyExchange" '{"federation-upstream-set":"all"}'

The above command defines a policy called “federate-me” which applies the federation on all the exchanges whose name is “MyExchange”. The name of exchanges can be a regular expression so it may include all the exchanges with the name that matches that regular expression. As you can see, the federation-upstream-set is set to ‘all’. This means that, we want to apply this policy to all upstream nodes that we have previously defined for the current node. You may also group the upstream nodes and define different policies for different groups by setting ‘federation-upstream-set’ parameter to that group.

Again, you need to execute the above set-policy command on all nodes. After that, the federation setup is complete. You can check the federation status using RabbitMQ command line tool as follows:

sudo rabbitmqctl eval 'rabbit_federation_status:status().'

Federation vs Clustering in Cloud Environments

Performance

As mentioned here, performance tests using PerfTest shows that the federation throughput is considerably lower than the clustering throughput. The reason is that, federation works at the higher abstraction level than clustering.

Reliability

RabbitMQ clustering does not tolerate network partitions well so it is recommended to be used in cases where brokers are connected via reliable LAN links. Whether the connections between AZs are reliable or not is debatable. Here it has been mentioned that clustering is fine where the brokers are distributed across multiple AZs. In my personal experience, in a production system running for 6 months having a RabbitMQ cluster of 3 nodes distributed in 3 AZs, we have observed once that the cluster is partitioned due to connection loss between AZs. If in the context of your application this level of unreliability is not acceptable, using federation is recommended. In case of using clustering, the monitoring system should immediately capture that the cluster is partitioned and recovery can be done automatically by re-joining the nodes to the cluster.

Setting Up

Setting up a federated RabbitMQ fleet is more complicated than a RabbitMQ cluster. Specifically upon scaling up, the existing nodes should be re-configured to have the newly added node as their new upstream node. This means that setting up an auto scaling group for a federated RabbitMQ fleet is not a trivial task.

Summary

Federation is a RabbitMQ plugin in order to setup distributed brokers. Generally, in reliable networks, clustering is preferred due to higher throughput and easier setup. Although connection links between AWS AZs are not as reliable as network connections inside a single AZ, by having appropriate monitoring system in-place, clustering is preferred over federation. But it should be mentioned that this still depends on your application context and the level of unreliability that your application can accept.