Concurrency with Future
Future
is the primary source of concurrency in a Gears program.
There are actually two kinds of futures: passive and active. However,
we shall look at active futures for now, and come back with passive futures
in the next chapter.
Spawning a concurrent computation with Future.apply
Future.apply
takes a body of the type Async.Spawn ?=> T (that is, an async computation)
and runs it completely concurrently to the current computation.
Future.apply returns immediately with a value of type Future[T].
Eliding details (that we shall come back next chapter), there are two things you can do with this Future[T]:
-
Await: calling
.awaiton theFuture[T]suspends the current Async context until theFuture's body returns, and give you back the returned valueT. Ifbodythrows an exception,.awaitwill also throw as well..awaitResultis the alternative where both cases are wrapped inside aTry.Being a suspension point,
.awaitrequires an Async context. -
Cancel: calling
.cancel()on theFuture[T]sends a cancellation signal to theFuture. This would causebody's execution to receiveCancellationExceptionon the next suspension point, and cancel all of thebody's spawned Futures and so on, in a cascading fashion.After cancelling,
.await-ing theFuturewill raise aCancellationException.
An example
We can have a look at one simple example, implementing my favorite sorting algorithm, sleepsort!
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import scala.collection.mutable
import scala.concurrent.duration.*
import gears.async.*
import gears.async.default.given
@main def sleepSort() =
Async.blocking: /* (spawn: Async.Spawn) ?=> */
val origin = Seq(50, 80, 10, 60, 40, 100)
// Spawn sleeping futures!
val buf = mutable.ArrayBuffer[Int]()
origin
.map: n =>
Future /*(using spawn)*/: /* (Async) ?=> */
AsyncOperations.sleep(n.millis)
buf.synchronized:
buf += n
.awaitAll
println(buf)
Let's walk through what's happening here:
- Starting from a
Seq[Int], we.mapthe elements each to create aFuture[Unit]. CallingFuture:gives us a newAsynccontext, passed into theFuture's body. ThisAsynccontext inherits the suspension implementation fromAsync.blocking's context, and has a sub-scope withAsync.blocking's context as its parent. We will talk about scoping in the next section. - In each
Future, wesleepfor the amount of milliseconds the same as the element's value itself. Note thatsleepwould suspend theAsynccontext given by theFuture, i.e. the future's body, but not the one running underAsync.blocking. Hence, it is totally okay to have thousands ofFutures thatsleep! .mapgives us back aSeq[Future[Unit]]immediately, which we can wait for all futures to complete with the extension method [.awaitAll]. This suspendsAsync.blockingcontext until all futures run to completion, and gives us backSeq[Unit]. The return value is not interesting though, what we care about is thebufat the end.
Async.Spawn
If you noticed, Async.blocking gives you an Async.Spawn context
(rather than just Async), which Future.apply
requires. What is it?
Recall that Async, as a capability, gives you the ability to suspend the computation to wait for other values.
Async.Spawn adds a new capability on top: it allows you to spawn concurrent computations as well.
Getting a Spawn capability is actually very simple. You get an Async.Spawn capability on Async.blocking by default,
and both Future.apply and Async.group gives you an Async.Spawn capability sub-scoped from a parent Async scope.
Note that, however, most functions do not take an Async.Spawn context by default.
This is due to the nature of Spawn's capability to spawn computations that runs as long as the Spawn scope is alive, which typically corresponds to the lexical scope of the function.
If functions take Async.Spawn, they are allowed to spawn futures that are still computing even after the function itself returns!
def fn()(using Async.Spawn) =
val f = Future:
useCPUResources()
0
Async.blocking:
val n = fn()
// f is still running at this point!!!
This is way more difficult to reason about, so the "sane" behavior is to not take Async.Spawn by default, and only
"upgrading" it (with Async.group) inside a scope that will take care of cleaning up these "dangling" futures on return.
Most of Gear's library are structured this way, and we strongly recommend you to do the same in your function designs.