Immutability and Co/Contravariance

After a long long time…

Immutability also Empowers Type Safety.

There are many reasons to use immutable data structures and you may have heard many of them. For example, immutable data is safe to be shared across multiple threads etc. Apart from all of these awesome benefits, immutability helps us to employ better type systems. Here is an example:

In Java, Lists are not covariant. What does that mean? Long story short, List<Cat> is not subtype of List<Animal>. If they were, the following code would bark at runtime:

class Animal {}
class Cat extends Animal {}
class Dog extends Animal {}

List cats = new ArrayList()
List animals = cats;
animals.add(new Dog());
Cat persian = cats.get(0); // DOG IS NOT CAT!!!!

Lists (and other containers) not being covariant makes our lives hard. For example, if I have a function that accept List<Animal>, I cannot pass List<Cat> to it. I have to use wildcard and I lose type safety.

In contrast, in Scala, Lists are covariant. So, the following code will compile:

val cats: List[Cat] = List(new Cat(), new Cat())
val animals: List[Animal] = cats

Why is Scala too brave to let you do that? Because you cannot change animals after declaring it and that is why, in Scala (and Kotlin), only immutable collections are covariant. So you cannot add any Dog to animals! In other word, immutable list does not expose any function that consumes an instance of T. if it did, the following case would happen:

class List[+T] {
  def consume(a: T, int index): Unit { 
     underlying[index] = a 
  }
}

val cats = List[Cat] = List(new Cat(), new Cat())
val animals: List[Animal] = cats
animals.consume(new Dog, 0) // change the first cat to a dog!!!

Instead, instances of T are always produced by List member functions. Fancy speaking, instances of T are always used in covariant position and not in contravariant position. Even if you want to define your covariant typed class where there is a member that consumes an instance of type parameter, it will fail to compile.

class MyClass[+A] {
   def set(a: A): Unit = {} // fails to compile as an instance of A is used in a contravariant position
}

class MyClass[+A] {
   def get(): A = ... // it compiles, type parameter is used in covariant position
}

Kotlin has a similar approach as Scala, but what I like more in Kotlin (only in this very specific case) is that, to define covariant or contravariant classes, instead of using + and -, it uses out and in which clearly emphasize on producing and consuming.

CRDT: Conflict Free Data Type

Recently, a colleague of mine and I gave a presentation in Lambda Days conference about CRDTs. Its video has just been released, so I decided to dedicate a blog post to it:

And here is another version of our talk which has been presented on Curry On conference:

And yet another version presented on StrangeLoop by my friend Dmitry Ivanov.

Scala Futures with Timeout

Scala Futures do not support built-in timeout unless you block on the future using scala.concurrent.Await as follows:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

lazy val f = future { Thread.sleep(2000); true }
Await.result(f, 1 second)

The above code will timeout:

java.util.concurrent.TimeoutException: Futures timed out after [1 second]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
...

Sometimes non-blocking external libraries are providing futures as their functions return type but you cannot provide any timeout for those functions. Imagine you may want to ping a database service but you do not want to wait forever to receive the ping result. For example scalapenos client library for Riak provides a ping function returning Future[Boolean] but you cannot provide any timeout for that function. Since it uses Riak HTTP APIs and connect to those API using spray-client pipelining, it relies on the default sendReceive timeout which is 60 seconds.

But as you might have heard a lot, blocking on a future is not recommended because it wastes a thread. Instead, a non-blocking approach to provide a timeout to a future could be achieved using akka after pattern:


import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.actor.ActorSystem
import akka.pattern.after

val system = ActorSystem("theSystem")

lazy val f = future { Thread.sleep(2000); true }
lazy val t = after(duration = 1 second, using = system.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

val fWithTimeout = Future firstCompletedOf Seq(f, t)

fWithTimeout.onComplete {
   case Success(x) => println(x)
   case Failure(error) => println(error)
}

The logic is quite self-explanatory. According to akka documentation, after returns a future that will be completed with the success or failure of the provided value after the specified duration. So, in the above example, t will fail after 1 second with TimeoutException. Therefore, fWithTimeout will be a future to the result of t if it completes faster than f meaning that f has not completed within the expected time.

Notice that in order to have precise timing for the future execution, the computation of the provided future to after function should not have started in advance. For example, in the above code, if you want to extract

Future.failed(new TimeoutException("Future timed out!"))

to a variable, you have to define it as a lazy val and not a val. Passing the future to after inline as I did is also fine because the parameter is a by-name parameter.

The downside of this approach is its dependency on akka. Here you will find a lower level approach using pure scala (plus Netty HashedWheelTimer scheduler) where actually the after pattern has been mimicked.

In order to have easier to use timeout enabled futures, we can simply extend scala futures to support timeout by using implicit class (“Pimp my Library” pattern):


import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import akka.pattern.after

implicit class FutureExtensions[T](f: Future[T]) {
  def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
      Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
  }
}

Now we can enable timeout on any future easily:


import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem

implicit val system = ActorSystem("theSystem")
implicit val timeout = 1 second

lazy val f = future { Thread.sleep(2000); true }

f withTimeout new TimeoutException("Future timed out!") onComplete {
  case Success(x) => println(x)
  case Failure(error) => println(error)
}

As I mentioned earlier the above solution fits akka actor systems perfectly.