Working with multiple Futures
While writing code that performs computation concurrently, it is common to spawn multiple futures and then await for them at the same time, with some particular logic.
Gears provides some tools to deal with multiple futures, both for a statically known number and
unknown number of futures (i.e. if you have a Seq[Future[T]]
).
Combining two futures
Future.zip
takes two futures Future[A]
and Future[B]
and returns a Future[(A, B)]
,
completing when both input futures do.
Async.blocking:
val a = Future(1)
val b = Future("one")
val (va, vb) = a.zip(b).await // (1, "one")
if one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without suspending until the other completes).
Future.or
takes two Future[T]
s and returns another Future[T]
, completing with the first value
that one of the two futures succeeds with.
If both throws, throw the exception that was thrown last.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.hour)
2
val c = Future:
throw Exception("explode!")
a.or(b).await // returns 1 immediately
b.or(a).await // returns 1 immediately
c.or(a).await // returns 1 immediately
c.or(c).await // throws
a.orWithCancel(b).await // returns 1 immediately, and b is cancelled
Future.orWithCancel
is a variant of Future.or
where the slower Future
is cancelled.
Combining a Sequence of futures
Seq[Future[T]].awaitAll
takes a Seq[Future[T]]
and return a Seq[T]
when all futures in the input Seq
are completed.
The results are in the same order corresponding to the Future
s of the original Seq
.
If one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without
suspending until the other completes).
It is a more performant version of futures.fold(_.zip _).await
.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.second)
2
val c = Future:
throw Exception("explode!")
Seq(a, b).awaitAll // Seq(1, 2), suspends for a second
Seq(a, b, c).awaitAll // throws immediately
awaitAllOrCancel
is a variant where all futures are cancelled when one of them throws.
Seq[Future[T]].awaitFirst
takes a Seq[Future[T]]
and return a T
when the first future in the input Seq
succeeds.
If all input futures throw an exception, the last exception is re-thrown.
It is a more performant version of futures.fold(_.or _).await
.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.second)
2
val c = Future:
throw Exception("explode!")
Seq(a, b).awaitFirst // returns 1 immediately
Seq(a, b, c).awaitFirst // returns 1 immediately
awaitFirstWithCancel
is a variant where all other futures are cancelled when one succeeds.
Async.select
Async.select
works similar to awaitFirst
, but allows you to attach a body to handle the returned value, so the futures
being raced does not have to be of the same type (as long as the handler returns the same type).
Compared to creating more futures, Async.select
also guarantees that exactly one of the handlers
is run, so you can be more confident putting side-effects into the handlers.
Async.blocking:
val a = Future(1)
val b = Future("one")
val v = Async.select(
a.handle: va =>
s"number $va was returned",
b.handle: vb =>
s"string `$vb` was returned"
)
println(v) // either "number 1 was returned", or "string `one` was returned"
Async.select
takes SelectCase*
varargs, so you can also .map
a sequence of Future
as well.
Async.blocking:
val futs = (1 to 10).map(Future.apply)
val number = Async.select(futs.map(_.handle: v => s"$v returned")*)
Future.Collector
Future.Collector
takes a sequence of Future
and exposes
a ReadableChannel
1
returning the Futures as they are completed.
Collector
allows you to manually implement handling of multiple futures as they arrive, should you need
more complex behaviors than the ones provided.
For example, here is a simplified version of .awaitFirst
:
def awaitFirst[T](futs: Seq[Future[T]])(using Async): T =
val len = futs.length
val collector = Future.Collector(futs*)
@tailrec def loop(failed: Int): T =
val fut = collector.results.read()
fut.awaitResult match
case Success(value) => value
case Failure(exc) =>
if failed + 1 == len then throw exc
else loop(failed + 1)
loop(0)
MutableCollector
is a Collector
that also lets you add
more
Future
s after creation.
learn more about channels in a future chapter.