Scala Futures
do not support built-in timeout unless you block on the future using scala.concurrent.Await as follows:
import scala.concurrent._ import scala.concurrent.duration._ import ExecutionContext.Implicits.global lazy val f = future { Thread.sleep(2000); true } Await.result(f, 1 second)
The above code will timeout:
java.util.concurrent.TimeoutException: Futures timed out after [1 second] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) ...
Sometimes non-blocking external libraries are providing futures as their functions return type but you cannot provide any timeout for those functions. Imagine you may want to ping a database service but you do not want to wait forever to receive the ping result. For example scalapenos client library for Riak provides a ping function returning Future[Boolean]
but you cannot provide any timeout for that function. Since it uses Riak HTTP APIs and connect to those API using spray-client pipelining, it relies on the default sendReceive
timeout which is 60 seconds.
But as you might have heard a lot, blocking on a future is not recommended because it wastes a thread. Instead, a non-blocking approach to provide a timeout to a future could be achieved using akka after
pattern:
import scala.concurrent._ import scala.concurrent.duration._ import ExecutionContext.Implicits.global import scala.util.{Failure, Success} import akka.actor.ActorSystem import akka.pattern.after val system = ActorSystem("theSystem") lazy val f = future { Thread.sleep(2000); true } lazy val t = after(duration = 1 second, using = system.scheduler)(Future.failed(new TimeoutException("Future timed out!"))) val fWithTimeout = Future firstCompletedOf Seq(f, t) fWithTimeout.onComplete { case Success(x) => println(x) case Failure(error) => println(error) }
The logic is quite self-explanatory. According to akka documentation, after
returns a future that will be completed with the success or failure of the provided value after the specified duration. So, in the above example, t
will fail after 1 second with TimeoutException
. Therefore, fWithTimeout
will be a future to the result of t
if it completes faster than f
meaning that f
has not completed within the expected time.
Notice that in order to have precise timing for the future execution, the computation of the provided future to after
function should not have started in advance. For example, in the above code, if you want to extract
Future.failed(new TimeoutException("Future timed out!"))
to a variable, you have to define it as a lazy val
and not a val
. Passing the future to after
inline as I did is also fine because the parameter is a by-name parameter.
The downside of this approach is its dependency on akka. Here you will find a lower level approach using pure scala (plus Netty HashedWheelTimer scheduler) where actually the after
pattern has been mimicked.
In order to have easier to use timeout enabled futures, we can simply extend scala futures to support timeout by using implicit class (“Pimp my Library” pattern):
import scala.concurrent._ import scala.concurrent.duration.FiniteDuration import ExecutionContext.Implicits.global import akka.actor.ActorSystem import akka.pattern.after implicit class FutureExtensions[T](f: Future[T]) { def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = { Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout))) } }
Now we can enable timeout on any future easily:
import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{ Success, Failure } import ExecutionContext.Implicits.global import akka.actor.ActorSystem implicit val system = ActorSystem("theSystem") implicit val timeout = 1 second lazy val f = future { Thread.sleep(2000); true } f withTimeout new TimeoutException("Future timed out!") onComplete { case Success(x) => println(x) case Failure(error) => println(error) }
As I mentioned earlier the above solution fits akka actor systems perfectly.
There is a small issue with this approach : it takes 1 thread per future. See https://github.com/m3dev/octoparts/blob/develop/app/com/m3/octoparts/future/RichFutureWithTimeout.scala for a scheduler based implementation
LikeLike
Thanks for the comment. I do not see any difference between these two approaches actually. About being scheduler based, here, the execution is also scheduler based. As you may see, I pass ActorSystem to “withTimeout” function and there its scheduler is being passed to “after” function. So the ActorSystem dispatcher will be used for executing the scheduled timeout future. By the way, notice that I pass the timeout future as by name parameter to after pattern so its not evaluated until the timeout reaches.
LikeLike
Can you elaborate on “1 thread per future”? I thought that every future always gets its own thread. I’m new to futures.
LikeLike
To execute a future, a thread is selected from an available execution context (thread pool) and that thread will compute the future. You can take a look to http://docs.scala-lang.org/overviews/core/futures.html or more detailed information about thread handling for future execution.
LikeLike