Scala Futures with Timeout

Scala Futures do not support built-in timeout unless you block on the future using scala.concurrent.Await as follows:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

lazy val f = future { Thread.sleep(2000); true }
Await.result(f, 1 second)

The above code will timeout:

java.util.concurrent.TimeoutException: Futures timed out after [1 second]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
...

Sometimes non-blocking external libraries are providing futures as their functions return type but you cannot provide any timeout for those functions. Imagine you may want to ping a database service but you do not want to wait forever to receive the ping result. For example scalapenos client library for Riak provides a ping function returning Future[Boolean] but you cannot provide any timeout for that function. Since it uses Riak HTTP APIs and connect to those API using spray-client pipelining, it relies on the default sendReceive timeout which is 60 seconds.

But as you might have heard a lot, blocking on a future is not recommended because it wastes a thread. Instead, a non-blocking approach to provide a timeout to a future could be achieved using akka after pattern:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.actor.ActorSystem
import akka.pattern.after

val system = ActorSystem("theSystem")

lazy val f = future { Thread.sleep(2000); true }
lazy val t = after(duration = 1 second, using = system.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

val fWithTimeout = Future firstCompletedOf Seq(f, t)

fWithTimeout.onComplete {
   case Success(x) => println(x)
   case Failure(error) => println(error)
}

The logic is quite self-explanatory. According to akka documentation, after returns a future that will be completed with the success or failure of the provided value after the specified duration. So, in the above example, t will fail after 1 second with TimeoutException. Therefore, fWithTimeout will be a future to the result of t if it completes faster than f meaning that f has not completed within the expected time.

Notice that in order to have precise timing for the future execution, the computation of the provided future to after function should not have started in advance. For example, in the above code, if you want to extract

Future.failed(new TimeoutException("Future timed out!"))

to a variable, you have to define it as a lazy val and not a val. Passing the future to after inline as I did is also fine because the parameter is a by-name parameter.

The downside of this approach is its dependency on akka. Here you will find a lower level approach using pure scala (plus Netty HashedWheelTimer scheduler) where actually the after pattern has been mimicked.

In order to have easier to use timeout enabled futures, we can simply extend scala futures to support timeout by using implicit class (“Pimp my Library” pattern):


import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import akka.pattern.after

implicit class FutureExtensions[T](f: Future[T]) {
  def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
      Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
  }
}

Now we can enable timeout on any future easily:


import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem

implicit val system = ActorSystem("theSystem")
implicit val timeout = 1 second

lazy val f = future { Thread.sleep(2000); true }

f withTimeout new TimeoutException("Future timed out!") onComplete {
  case Success(x) => println(x)
  case Failure(error) => println(error)
}

As I mentioned earlier the above solution fits akka actor systems perfectly.

Advertisements

4 thoughts on “Scala Futures with Timeout

    • Nami says:

      Thanks for the comment. I do not see any difference between these two approaches actually. About being scheduler based, here, the execution is also scheduler based. As you may see, I pass ActorSystem to “withTimeout” function and there its scheduler is being passed to “after” function. So the ActorSystem dispatcher will be used for executing the scheduled timeout future. By the way, notice that I pass the timeout future as by name parameter to after pattern so its not evaluated until the timeout reaches.

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s