Working with multiple Futures

While writing code that performs computation concurrently, it is common to spawn multiple futures and then await for them at the same time, with some particular logic.

Gears provides some tools to deal with multiple futures, both for a statically known number and unknown number of futures (i.e. if you have a Seq[Future[T]]).

Combining two futures

Future.zip takes two futures Future[A] and Future[B] and returns a Future[(A, B)], completing when both input futures do.

Async.blocking:
  val a = Future(1)
  val b = Future("one")
  val (va, vb) = a.zip(b).await // (1, "one")

if one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without suspending until the other completes).

Future.or takes two Future[T]s and returns another Future[T], completing with the first value that one of the two futures succeeds with. If both throws, throw the exception that was thrown last.

Async.blocking:
  val a = Future(1)
  val b = Future:
    AsyncOperations.sleep(1.hour)
    2
  val c = Future:
    throw Exception("explode!")

  a.or(b).await // returns 1 immediately
  b.or(a).await // returns 1 immediately
  c.or(a).await // returns 1 immediately
  c.or(c).await // throws

  a.orWithCancel(b).await // returns 1 immediately, and b is cancelled

Future.orWithCancel is a variant of Future.or where the slower Future is cancelled.

Combining a Sequence of futures

Seq[Future[T]].awaitAll takes a Seq[Future[T]] and return a Seq[T] when all futures in the input Seq are completed. The results are in the same order corresponding to the Futures of the original Seq. If one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without suspending until the other completes).

It is a more performant version of futures.fold(_.zip _).await.

Async.blocking:
    val a = Future(1)
    val b = Future:
      AsyncOperations.sleep(1.second)
      2
    val c = Future:
      throw Exception("explode!")

    Seq(a, b).awaitAll // Seq(1, 2), suspends for a second
    Seq(a, b, c).awaitAll // throws immediately

awaitAllOrCancel is a variant where all futures are cancelled when one of them throws.

Seq[Future[T]].awaitFirst takes a Seq[Future[T]] and return a T when the first future in the input Seq succeeds. If all input futures throw an exception, the last exception is re-thrown.

It is a more performant version of futures.fold(_.or _).await.

Async.blocking:
    val a = Future(1)
    val b = Future:
      AsyncOperations.sleep(1.second)
      2
    val c = Future:
      throw Exception("explode!")

    Seq(a, b).awaitFirst    // returns 1 immediately
    Seq(a, b, c).awaitFirst // returns 1 immediately

awaitFirstWithCancel is a variant where all other futures are cancelled when one succeeds.

Async.select

Async.select works similar to awaitFirst, but allows you to attach a body to handle the returned value, so the futures being raced does not have to be of the same type (as long as the handler returns the same type).

Compared to creating more futures, Async.select also guarantees that exactly one of the handlers is run, so you can be more confident putting side-effects into the handlers.

Async.blocking:
  val a = Future(1)
  val b = Future("one")

  val v = Async.select(
    a.handle: va =>
      s"number $va was returned",
    b.handle: vb =>
      s"string `$vb` was returned"
  )

  println(v) // either "number 1 was returned", or "string `one` was returned"

Async.select takes SelectCase* varargs, so you can also .map a sequence of Future as well.

Async.blocking:
  val futs = (1 to 10).map(Future.apply)
  val number = Async.select(futs.map(_.handle: v => s"$v returned")*)

Future.Collector

Future.Collector takes a sequence of Future and exposes a ReadableChannel1 returning the Futures as they are completed.

Collector allows you to manually implement handling of multiple futures as they arrive, should you need more complex behaviors than the ones provided.

For example, here is a simplified version of .awaitFirst:

def awaitFirst[T](futs: Seq[Future[T]])(using Async): T =
  val len = futs.length
  val collector = Future.Collector(futs*)
  @tailrec def loop(failed: Int): T =
    val fut = collector.results.read()
    fut.awaitResult match
      case Success(value) => value
      case Failure(exc) =>
        if failed + 1 == len then throw exc
        else loop(failed + 1)
  loop(0)

MutableCollector is a Collector that also lets you add more Futures after creation.

1

learn more about channels in a future chapter.