Concurrency with Future
Future
is the primary source of concurrency in a Gears program.
There are actually two kinds of futures: passive and active. However,
we shall look at active futures for now, and come back with passive futures
in the next chapter.
Spawning a concurrent computation with Future.apply
Future.apply
takes a body
of the type Async.Spawn ?=> T
(that is, an async computation)
and runs it completely concurrently to the current computation.
Future.apply
returns immediately with a value of type Future[T]
.
Eliding details (that we shall come back next chapter), there are two things you can do with this Future[T]
:
-
Await: calling
.await
on theFuture[T]
suspends the current Async context until theFuture
's body returns, and give you back the returned valueT
. Ifbody
throws an exception,.await
will also throw as well..awaitResult
is the alternative where both cases are wrapped inside aTry
.Being a suspension point,
.await
requires an Async context. -
Cancel: calling
.cancel()
on theFuture[T]
sends a cancellation signal to theFuture
. This would causebody
's execution to receiveCancellationException
on the next suspension point, and cancel all of thebody
's spawned Futures and so on, in a cascading fashion.After cancelling,
.await
-ing theFuture
will raise aCancellationException
.
An example
We can have a look at one simple example, implementing my favorite sorting algorithm, sleepsort!
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import scala.collection.mutable
import scala.concurrent.duration.*
import gears.async.*
import gears.async.default.given
@main def sleepSort() =
Async.blocking: /* (spawn: Async.Spawn) ?=> */
val origin = Seq(50, 80, 10, 60, 40, 100)
// Spawn sleeping futures!
val buf = mutable.ArrayBuffer[Int]()
origin
.map: n =>
Future /*(using spawn)*/: /* (Async) ?=> */
AsyncOperations.sleep(n.millis)
buf.synchronized:
buf += n
.awaitAll
println(buf)
Let's walk through what's happening here:
- Starting from a
Seq[Int]
, we.map
the elements each to create aFuture[Unit]
. CallingFuture:
gives us a newAsync
context, passed into theFuture
's body. ThisAsync
context inherits the suspension implementation fromAsync.blocking
's context, and has a sub-scope withAsync.blocking
's context as its parent. We will talk about scoping in the next section. - In each
Future
, wesleep
for the amount of milliseconds the same as the element's value itself. Note thatsleep
would suspend theAsync
context given by theFuture
, i.e. the future's body, but not the one running underAsync.blocking
. Hence, it is totally okay to have thousands ofFuture
s thatsleep
! .map
gives us back aSeq[Future[Unit]]
immediately, which we can wait for all futures to complete with the extension method [.awaitAll
]. This suspendsAsync.blocking
context until all futures run to completion, and gives us backSeq[Unit]
. The return value is not interesting though, what we care about is thebuf
at the end.
Async.Spawn
If you noticed, Async.blocking
gives you an Async.Spawn
context
(rather than just Async
), which Future.apply
requires. What is it?
Recall that Async
, as a capability, gives you the ability to suspend the computation to wait for other values.
Async.Spawn
adds a new capability on top: it allows you to spawn concurrent computations as well.
Getting a Spawn
capability is actually very simple. You get an Async.Spawn
capability on Async.blocking
by default,
and both Future.apply
and Async.group
gives you an Async.Spawn
capability sub-scoped from a parent Async
scope.
Note that, however, most functions do not take an Async.Spawn
context by default.
This is due to the nature of Spawn
's capability to spawn computations that runs as long as the Spawn
scope is alive, which typically corresponds to the lexical scope of the function.
If functions take Async.Spawn
, they are allowed to spawn futures that are still computing even after the function itself returns!
def fn()(using Async.Spawn) =
val f = Future:
useCPUResources()
0
Async.blocking:
val n = fn()
// f is still running at this point!!!
This is way more difficult to reason about, so the "sane" behavior is to not take Async.Spawn
by default, and only
"upgrading" it (with Async.group
) inside a scope that will take care of cleaning up these "dangling" futures on return.
Most of Gear's library are structured this way, and we strongly recommend you to do the same in your function designs.