Introduction to Gears
This is a book about Gears, an experimental asynchronous programming library for Scala 3!
Gears aim to provide a basis for writing cross-platform high-level asynchronous code with direct-style Scala and structured concurrency, while allowing library implementations to deal with external asynchronous IO with featureful primitives and expose a simple direct-style API.
While Gears is currently in experimental stage (definitely not recommended for production!), we provide basic support for
- Virtual-threads-enabled JVM environments (JRE 21 or later, or JRE 19 with experimental virtual threads enabled)
- Scala Native 0.5.0 or later with delimited continuations support (on Linux, MacOS and BSDs).
Adding Gears in your own Scala Project
Gears can be added to your own project by adding the following dependency, by your preferred build tool:
- With
sbt
:libraryDependencies += "ch.epfl.lamp" %%% "gears" % "0.2.0",
- With
mill
:def ivyDeps = Agg( // ... other dependencies ivy"ch.epfl.lamp::gears::0.2.0" )
- With
scala-cli
://> using dep "ch.epfl.lamp::gears::0.2.0"
Setting up gears
Running Examples in this Book
The book contains some examples in the src/scala
directory.
For example, this is the hello
example, located in src/scala/hello.scala
:
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import gears.async.*
import gears.async.default.given
@main def main() =
Async.blocking:
val hello = Future:
print("Hello")
val world = Future:
hello.await
println(", world!")
world.await
All such examples can be run with scala-cli provided the following dependencies:
- A locally published version of
gears
. - Additional platform dependencies, as described below.
Compiling gears
To compile gears
from source, clone gears
from the GitHub repository:
git clone https://github.com/lampepfl/gears --recursive
And run rootJVM/publishLocal
.
For Scala Native
Make sure to prepare additional dependencies for Scala Native.
You would also need to locally publish a custom version of munit
for Scala Native 0.5.0, which is available as a
git
submodule from the gears repository.
To locally publish munit
, from the root directory of gears
repository:
git submodule update # Create and update submodules
dependencies/publish_deps.sh # Locally publish pinned dependencies
Finally, gears
artifacts for Scala Native can be locally published by
sbt rootNative/publishLocal
Trying out examples
On JVM
As mentioned in the introduction, we require a JVM version that supports virtual threads. With that provided, examples can be run simply with
scala-cli run "path-to-example"
For the hello
example:
scala-cli run src/scala/hello.scala
On Scala Native
The current Scala Native version required by Gears is 0.5.1.
Therefore examples in this book hardcodes 0.5.1
as the Scala Native version, which
is also the version gears
is currently compiled against.
Examples can be run with
scala-cli --platform scala-native run "path-to-example"
For the hello
example:
scala-cli --platform scala-native run src/scala/hello.scala
An "Async" function
The central concept of Gears programs are functions taking an Async
context,
commonly referred to in this book as an async function.
import gears.async.*
def asyncFn(n: Int)(using Async): Int =
// ^^^^^ an async context
AsyncOperations.sleep(n * 100 /* milliseconds */)/*(using Async)*/
n + 1
We can look at the Async
context from two points of view, both as a capability
and a context:
- As a Capability,
Async
represents the ability to suspend the current computation, waiting for some event to arrive (such as timeouts, external I/Os or even other concurrent computations). While the current computation is suspended, we allow the runtime to use the CPU for other tasks, effectively utilizing it better (compared to just blocking the computation until the wanted event arrives). - As a Context,
Async
holds the neccessary runtime information for the scheduler to know which computation should be suspended and how to resume them later. It also contains a concurrency scope, which we will see later in the structured concurrency section.
However, different from other languages with a concept of Async functions, gears
's async functions are
just normal functions with an implicit Async
parameter!
This means we can also explicitly take the parameter as opposed to using Async
:
def alsoAsyncFn(async: Async): Int =
asyncFn(10)(using async)
and do anything with the Async context as if it was a variable1!
Passing on Async
contexts
Let's look at a more fully-fledged example, src/scala/counting.scala
:
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import gears.async.*
import gears.async.default.given
/** Counts to [[n]], sleeping for 100milliseconds in between. */
def countTo(n: Int)(using Async): Unit =
(1 to n).foreach: i =>
AsyncOperations.sleep(100 /* milliseconds */ ) /*(using Async)*/
println(s"counted $i")
@main def Counting() =
Async.blocking: /* (async: Async) ?=> */
countTo(10)
println("Finished counting!")
(if you see //>
directive on examples, it means the example can be run self-contained!)
Let's look at a few interesting points in this example:
-
Async.blocking
:Async.blocking
gives you a "root"Async
context, given implementations of the supporting layer (neatly provided byimport gears.async.default.given
!).This defines a "scope", not too different from
scala.util.Using
orscala.util.boundary
. As usual, the context provided within the scope should not escape it. Likescala.util.boundary
'sboundary.Label
, theAsync
context will be passed implicitly to other functions takingAsync
context. Note thatAsync.blocking
is the only provided mean of creating anAsync
context out of "nothing", so it is easy to keep track of when adoptinggears
into your codebase! -
countTo
returnsUnit
instead ofFuture[Unit]
or an equivalent. This indicates that *callingcountTo
will "block" the caller untilcountTo
has returned with a value. Noawait
needed, andcountTo
behaves just like a normal function!Of course, with the
Async
context passed,countTo
is allowed to perform operations that suspends, such assleep
(which suspends theAsync
context for the given amount of milliseconds).This illustrates an important concept in Gears: in most common cases, we write functions that accepts a suspendable context and calling them will block until they return! While it is completely normal to spawn concurrent/parallel computations and join/race them, as we will see in the next chapter, "natural" APIs written with Gears should have the same control flow as non-
Async
functions. -
Within
countTo
, note that we callsleep
under a function passed intoSeq.foreach
, effectively capturingAsync
within the function. This is completely fine:foreach
runs the given function in the sameAsync
context (not outside nor in a sub-context), and does not capture the function. Our capability and scoping rules is maintained, andforeach
isAsync
-capable by default!While this illustrates the above fact, we could've just written the function in a familiar fashion with
for
comprehension:/** Counts to [[n]], sleeping for 100milliseconds in between. */ def countTo(n: Int)(using Async): Unit = for i <- 1 to n do AsyncOperations.sleep(100.millis) println(s"counted $i")
That's all for now with the Async
context. Next chapter, we will properly introduce concurrency to our model.
Aside: Pervasive Async
problem?
At this point, if you've done asynchronous programming with JavaScript, Rust or C# before, you might wonder if Gears is offering a solution that comes with the What color is your function? problem.
It is true that the Async
context divides the function space into ones requiring it and ones that don't,
and generally you need an Async
context to call an async function2.
However:
-
Writing async-polymorphic higher order functions is trivial in Gears: should you not be caring about
Async
contexts when taking in function arguments (() => T
andAsync ?=> T
both works), simply take() => T
andAsync
-aware blocks will inherit the context from the caller!One obvious example is
Seq.foreach
from above. In fact, all current Scala collection API should still work with no changes. Of course, if applied in repeat the function will be run sequentially rather than concurrently, but that is the expected behavior ofSeq.foreach
. -
The precense of an
Async
context helps explicitly declaring that a certain function requires runtime support for suspension, as well as the precense of a concurrent scope (for cancellation purposes). Ultimately, that means the compiler does not have to be pessimistic about compiling all functions in a suspendable way (a la Go), hurting both performance and interopability (especially with C on Scala Native).For the user, is also a clear indication that calling the function will suspend to wait for (in most cases) external events (IO, filesystem, sleep, ...), and should prepare accordingly. Likewise, libraries using Gears should not be performing such tasks if they don't take an
Async
context.
With that in mind, it is useful (as with all languages with async functions) to treat Async
like a capability,
only pass them in functions handling async operations, and try to isolate business logic into functions that don't.
While in principle this is possible, capability and scoping rules apply to the Async context: functions taking
Async
capabilities should not capture it in a way that stays longer than the function's body execution.
In the future, capture checking should be able to find such violations and report them during the compilation process.
Technically Async.blocking
can be used to call any async function without an async context,
you should be aware of its pitfalls. That said, if you are migrating from a synchronous codebase,
they are functional bridges between migrated and in-progress parts of the codebase.
Concurrency with Future
Future
is the primary source of concurrency in a Gears program.
There are actually two kinds of futures: passive and active. However,
we shall look at active futures for now, and come back with passive futures
in the next chapter.
Spawning a concurrent computation with Future.apply
Future.apply
takes a body
of the type Async.Spawn ?=> T
(that is, an async computation)
and runs it completely concurrently to the current computation.
Future.apply
returns immediately with a value of type Future[T]
.
Eliding details (that we shall come back next chapter), there are two things you can do with this Future[T]
:
-
Await: calling
.await
on theFuture[T]
suspends the current Async context until theFuture
's body returns, and give you back the returned valueT
. Ifbody
throws an exception,.await
will also throw as well..awaitResult
is the alternative where both cases are wrapped inside aTry
.Being a suspension point,
.await
requires an Async context. -
Cancel: calling
.cancel()
on theFuture[T]
sends a cancellation signal to theFuture
. This would causebody
's execution to receiveCancellationException
on the next suspension point, and cancel all of thebody
's spawned Futures and so on, in a cascading fashion.After cancelling,
.await
-ing theFuture
will raise aCancellationException
.
An example
We can have a look at one simple example, implementing my favorite sorting algorithm, sleepsort!
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import scala.collection.mutable
import scala.concurrent.duration.*
import gears.async.*
import gears.async.default.given
@main def sleepSort() =
Async.blocking: /* (spawn: Async.Spawn) ?=> */
val origin = Seq(50, 80, 10, 60, 40, 100)
// Spawn sleeping futures!
val buf = mutable.ArrayBuffer[Int]()
origin
.map: n =>
Future /*(using spawn)*/: /* (Async) ?=> */
AsyncOperations.sleep(n.millis)
buf.synchronized:
buf += n
.awaitAll
println(buf)
Let's walk through what's happening here:
- Starting from a
Seq[Int]
, we.map
the elements each to create aFuture[Unit]
. CallingFuture:
gives us a newAsync
context, passed into theFuture
's body. ThisAsync
context inherits the suspension implementation fromAsync.blocking
's context, and has a sub-scope withAsync.blocking
's context as its parent. We will talk about scoping in the next section. - In each
Future
, wesleep
for the amount of milliseconds the same as the element's value itself. Note thatsleep
would suspend theAsync
context given by theFuture
, i.e. the future's body, but not the one running underAsync.blocking
. Hence, it is totally okay to have thousands ofFuture
s thatsleep
! .map
gives us back aSeq[Future[Unit]]
immediately, which we can wait for all futures to complete with the extension method [.awaitAll
]. This suspendsAsync.blocking
context until all futures run to completion, and gives us backSeq[Unit]
. The return value is not interesting though, what we care about is thebuf
at the end.
Async.Spawn
If you noticed, Async.blocking
gives you an Async.Spawn
context
(rather than just Async
), which Future.apply
requires. What is it?
Recall that Async
, as a capability, gives you the ability to suspend the computation to wait for other values.
Async.Spawn
adds a new capability on top: it allows you to spawn concurrent computations as well.
Getting a Spawn
capability is actually very simple. You get an Async.Spawn
capability on Async.blocking
by default,
and both Future.apply
and Async.group
gives you an Async.Spawn
capability sub-scoped from a parent Async
scope.
Note that, however, most functions do not take an Async.Spawn
context by default.
This is due to the nature of Spawn
's capability to spawn computations that runs as long as the Spawn
scope is alive, which typically corresponds to the lexical scope of the function.
If functions take Async.Spawn
, they are allowed to spawn futures that are still computing even after the function itself returns!
def fn()(using Async.Spawn) =
val f = Future:
useCPUResources()
0
Async.blocking:
val n = fn()
// f is still running at this point!!!
This is way more difficult to reason about, so the "sane" behavior is to not take Async.Spawn
by default, and only
"upgrading" it (with Async.group
) inside a scope that will take care of cleaning up these "dangling" futures on return.
Most of Gear's library are structured this way, and we strongly recommend you to do the same in your function designs.
Groups, Scoping and Structured Concurrency
If you've been following the book thus far, you've seen a lot of mentions of
"an Async
scope" and "sub-scoping". What exactly are those "scopes", and why
do they matter?
That's exactly what this section is about!
tl;dr: Gears Structured Concurrency
We can sum up the content of this section as below. For more details, read on!
Async
contexts form a tree, rooted byAsync.blocking
, each node associated by abody
computation. AllAsync
contexts only returns afterbody
completes.- A child
Async
context can be created by eitherAsync.group
, creating a direct child context "blocking" its caller, grantingAsync.Spawn
.Future.apply
, branching a concurrent computation linked to the currentAsync
context.
- When
body
completes, all linkedFuture
s are cancelled, and theAsync
context returns after the cancellation is completely handled.
With these three properties, we guarantee that calling using Async
functions will properly suspend if needed,
and cleans up properly on return.
In Gears, Async scopes form a tree
Let's look at the only three1 introduction points for an Async
context in gears.async
, and examine their signatures:
-
Async.blocking
's signature isobject Async: def blocking[T](body: Async.Spawn ?=> T)(using AsyncSupport, Scheduler): T
Just as the name suggest,
Async.blocking
gives you anAsync
context from thin air, but in exchange, whatever suspending computation happening inbody
causesblocking
to block. Hence,blocking
only returns whenbody
gives back aT
, whichblocking
happily forwards back to the caller.Looking at it from the perspective of the call stack,
blocking
looks like this... Pretty simple, isn't it? Withblocking
, you run suspending code in a scope that cleans up itself, just like returning from a function! -
Async.group
's signature isobject Async: def group[T](inner_body: Async.Spawn ?=> T)(using Async): T
Async.group
takes anAsync
scope, and starts a child scope under it. This scope inherits the suspend and scheduling implementation of the parentAsync
, but gives theinner_body
a scope of its own... It is actually almost the exact same asAsync.blocking
from the perspective of the caller (with anAsync
context!)Async.group
also "blocks" (suspending if possible, of course) untilinner_body
completes.So far so good, and our scope-stack is still a linear stack! Let's introduce concurrency...
-
Finally,
Future.apply
's signature isobject Future: def apply[T](future_body: Async.Spawn ?=> T)(using Async.Spawn): T
Future.apply
starts a concurrent computation that is bound to the givenAsync.Spawn
scope. What does that mean? Looking at the call stack... We can see thatFuture.apply
immediately returns (tobody
) a reference to theFuture
. In the meantime, a newAsync.Spawn
scope (namedFuture.apply
here) is created, runningfuture_body
. It is linked to the originalAsync.Spawn
passed intoFuture.apply
.We can now see that
Async
scopes, with both its associatedbody
and linkedFuture.apply
'sAsync
scopes, now forms a tree!
To illustrate what exactly happens with linked scopes as the program runs, let's walk through the cases of interactions between the scopes!
Technically Task.start
also introduces an Async
context, but it is exactly equivalent to Future.apply(Task.run())
.
Interactions between scopes
.await
ing a Future
When you .await
a future, the Async
scope used to .await
on that Future will suspend the computation until the
Future returns a value (i.e. its future_body
is complete).
Note that the Async
context passed into .await
does not have to be the same Async.Spawn
context that the Future
is linked to!
However, for reasons we'll see shortly, active Future
s will always be linked to one of the ancestors of body
's Async
context
(or the same one).
Another thing to note is that, upon completion, the Future
unlinks itself from the Async.Spawn
scope.
This is not very important, as the Future
already has a return value! But it is a good detail to notice when it comes to cleaning up.
Cleaning up: after body
completes
What happens when body
of an Async
context completes, while some linked Future
is still running?
In short: When body
completes, all linked child scopes are .cancel()
-ed, and Async
will suspend until no more
child scopes are linked before returning.
This is especially important!
It means that, futures that are never .await
ed will get cancelled. Make sure to .await
futures you want to complete!
Cleaning up: when cancel
is signalled
During a Future
's computation, if it is .cancel
-ed mid-way, the body keeps running (as there is no way to suddenly
interrupt it without data loss!)
However, if .await
is called with the Future
's Async
context (i.e. a request to suspend), it will immediately
throw with a CancellationException
.
The body
may catch and react to this exception, e.g. to clean up resources.
The Future
context will only attempt to unlink after body has returned, whether normally or by unwinding an exception.
Delaying cancellation with uninterruptible
The above .await
behavior (throwing CancellationException
) can be delayed within a Future by running the code under an
uninterruptible
scope.
This is useful for asynchronous clean-up:
Future:
val resource = acquireResource()
try
useResource(resource)
finally
Async.uninterruptble:
resource.cleanup()/*(using Async)*/
without uninterruptble
, suspensions done by resource.cleanup
will again throw CancellationException
s.
Working with multiple Futures
While writing code that performs computation concurrently, it is common to spawn multiple futures and then await for them at the same time, with some particular logic.
Gears provides some tools to deal with multiple futures, both for a statically known number and
unknown number of futures (i.e. if you have a Seq[Future[T]]
).
Combining two futures
Future.zip
takes two futures Future[A]
and Future[B]
and returns a Future[(A, B)]
,
completing when both input futures do.
Async.blocking:
val a = Future(1)
val b = Future("one")
val (va, vb) = a.zip(b).await // (1, "one")
if one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without suspending until the other completes).
Future.or
takes two Future[T]
s and returns another Future[T]
, completing with the first value
that one of the two futures succeeds with.
If both throws, throw the exception that was thrown last.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.hour)
2
val c = Future:
throw Exception("explode!")
a.or(b).await // returns 1 immediately
b.or(a).await // returns 1 immediately
c.or(a).await // returns 1 immediately
c.or(c).await // throws
a.orWithCancel(b).await // returns 1 immediately, and b is cancelled
Future.orWithCancel
is a variant of Future.or
where the slower Future
is cancelled.
Combining a Sequence of futures
Seq[Future[T]].awaitAll
takes a Seq[Future[T]]
and return a Seq[T]
when all futures in the input Seq
are completed.
The results are in the same order corresponding to the Future
s of the original Seq
.
If one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without
suspending until the other completes).
It is a more performant version of futures.fold(_.zip _).await
.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.second)
2
val c = Future:
throw Exception("explode!")
Seq(a, b).awaitAll // Seq(1, 2), suspends for a second
Seq(a, b, c).awaitAll // throws immediately
awaitAllOrCancel
is a variant where all futures are cancelled when one of them throws.
Seq[Future[T]].awaitFirst
takes a Seq[Future[T]]
and return a T
when the first future in the input Seq
succeeds.
If all input futures throw an exception, the last exception is re-thrown.
It is a more performant version of futures.fold(_.or _).await
.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.second)
2
val c = Future:
throw Exception("explode!")
Seq(a, b).awaitFirst // returns 1 immediately
Seq(a, b, c).awaitFirst // returns 1 immediately
awaitFirstWithCancel
is a variant where all other futures are cancelled when one succeeds.
Async.select
Async.select
works similar to awaitFirst
, but allows you to attach a body to handle the returned value, so the futures
being raced does not have to be of the same type (as long as the handler returns the same type).
Compared to creating more futures, Async.select
also guarantees that exactly one of the handlers
is run, so you can be more confident putting side-effects into the handlers.
Async.blocking:
val a = Future(1)
val b = Future("one")
val v = Async.select(
a.handle: va =>
s"number $va was returned",
b.handle: vb =>
s"string `$vb` was returned"
)
println(v) // either "number 1 was returned", or "string `one` was returned"
Async.select
takes SelectCase*
varargs, so you can also .map
a sequence of Future
as well.
Async.blocking:
val futs = (1 to 10).map(Future.apply)
val number = Async.select(futs.map(_.handle: v => s"$v returned")*)
Future.Collector
Future.Collector
takes a sequence of Future
and exposes
a ReadableChannel
1
returning the Futures as they are completed.
Collector
allows you to manually implement handling of multiple futures as they arrive, should you need
more complex behaviors than the ones provided.
For example, here is a simplified version of .awaitFirst
:
def awaitFirst[T](futs: Seq[Future[T]])(using Async): T =
val len = futs.length
val collector = Future.Collector(futs*)
@tailrec def loop(failed: Int): T =
val fut = collector.results.read()
fut.awaitResult match
case Success(value) => value
case Failure(exc) =>
if failed + 1 == len then throw exc
else loop(failed + 1)
loop(0)
MutableCollector
is a Collector
that also lets you add
more
Future
s after creation.
learn more about channels in a future chapter.
Supervising with Retries and Timeouts
With the ability to sleep non-blockingly, one might immediately think about implementing supervision of tasks with retrying behavior upon failure.
Gears provide some building blocks, as well as interfaces to customize the retry policies.
Timeout
The most simple supervision function is withTimeout
,
which takes a duration and an async block (Async ?=> T
). It runs the async block racing with the timeout, and
throws TimeoutException
if the timeout runs out before the async block, cancelling the
async block afterwards.
Async.blocking:
withTimeout(60.seconds):
val body = request.get("https://google.com")
// ...
withTimeoutOption
is a variant that wraps the output in Option[T]
instead, returning None
on timeout.
Retry
Similar to withTimeout
, Retry
takes a block and executes it, "blocking" the caller until its policy has finished.
Retry
allows you to specify:
- when to consider the block as complete (until success, until failure, never)
- how many consecutive failures should fail the policy
- how much to delay in between attempts through the
Delay
trait- Some delay policies are provided, such as a constant delay and an exponential backoff algorithm.
Async.blocking:
Retry
.untilSuccess
.withMaximumFailures(5)
.withDelay(Delay.exponentialBackoff(maximum = 1.minute, starting = 1.second, jitter = Jitter.full)):
val body = request.get("https://google.com")
// ...
Inter-future communication with Channels
For immutable data, it is easy and recommended to directly
capture references to them from within Future
s. However, to share mutable data
or to communicate between concurrent computations, Gears recommend following
Effective Go's slogan on sharing:
Do not communicate by sharing memory; instead, share memory by communicating.
To facillitate this, Gears provides channels, a simple mean of pipe-style communication between computations.
Channels
Channel
can be thought
of as a pipe between senders and readers, with or without a buffer in between. The latter
differentiates between the types of channels provided by Gears.
Nevertheless, all channels provide the following methods:
-
send
takes a valueT
and attempt to send it through the channel. Suspends until the value has been received or stored in the channel's buffer (if one exists).If the channel is closed,
send
will throwChannelClosedException
. -
read
suspends until a value is available through the channel, consuming it from the channel.read
returnsEither[Closed, T]
, withLeft(Closed)
returned in the obvious case. -
close
simply closes the channel.
Channels provide the following guarantees:
- Two sequential
send
s (onesend
followed by anothersend
, not done concurrently) will beread
in the same order. - An item that is
send
is alwaysread
exactly once, unless if it was in the channel's buffer when the channel is closed.
Types of channels
Gears provide 3 types of channels:
-
SyncChannel
are channels without a buffer, so everysend
will suspend until a correspondingread
succeeds.Since there are no buffers, an item that is
send
is alwaysread
exactly once. -
BufferedChannel
are channels with a buffer of a fixed size.send
s will succeed immediately if there is space in the buffer, and suspend otherwise, until there is space for it.When
cancel
led, items in the buffer are dropped. -
UnboundedChannel
are channels with a buffer that is infinitely growable.send
s always succeed immediately. In fact,UnboundedChannel
providessendImmediately
that never suspends.When
cancel
led, items in the buffer are dropped.
Restricted channel interfaces
When writing functions taking in channels, it is often useful to restrict them to be read-only or send-only.
Gears Channel
trait is composed of
ReadableChannel
,
SendableChannel
and
Closeable
, so any channel can be upcasted to one of the above three interfaces.
Async.select
with channels
Async.select
can be used with channels. To do so, use
.readSource
for reading and
.sendSource(T)
for sending; before
attaching .handle
and providing a handler.
Async.blocking:
val fut = Future:
AsyncOperations.sleep(1.minute)
10
val ch = SyncChannel[Int]()
val readFut = Future:
ch.read().right.get + 1
Async.select(
fut.handle: x => println(s"fut returned $x"),
ch.sendSource(20).handle:
case Left(Closed) => throw Exception("channel closed!?")
case Right(()) =>
println(s"sent from channel!")
println(s"readFut returned ${readFut.await}")
)
Similar to future handlers, Async.select
guarantees that exactly one of the handlers are run.
Not only so, it also guarantees that only the channel event with the handler that is run will go through!
For example, in the following snippet, it guarantees that exactly one channel is consumed from for each loop:
val chans = (1 to 2).map: i =>
val chan = UnboundedChannel()
chan.sendImmediately(i)
chan
for i <- 1 to 2 do
val read = Async.select(
chans(0).readSource.handle: v => v,
chans(1).readSource.handle: v => v,
)
println(read) // prints 1 2 or 2 1
It is possible to mix reads and sends within one Async.select
, or mix channel operations with futures!
Channel in Use: Actor pattern
One of the common patterns of using channels is to write functions that continuously take inputs from one channel, process them, and send their output into another channel. This is sometimes referred to as an Actor, or a Communicating Sequential Process.
Let's look at one of the examples, inspired by Rob Pike's talk on concurrency patterns in Go. This is an implementation of a concurrent prime sieve.
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
//> using nativeMode "release-full"
import java.io.Closeable
import scala.annotation.tailrec
import scala.util.boundary
import gears.async.*
import gears.async.default.given
// Sender that owns the channel
type OwnedSender[T] = SendableChannel[T] & Closeable
// Send the sequence 2, 3, 4, ..., until and then close the channel
def generate(until: Int)(ch: OwnedSender[Int])(using Async) =
for i <- 2 to until do ch.send(i)
ch.close()
// Filter out multiples of k
def sieve(
k: Int
)(in: ReadableChannel[Int], out: OwnedSender[Int])(using Async) =
@tailrec def loop(): Unit =
in.read() match
case Left(_) => ()
case Right(n) =>
if n % k != 0 then out.send(n)
loop()
loop()
out.close()
@main def PrimeSieve(n: Int) =
Async.blocking:
// set up sieves
val inputChan = SyncChannel[Int]()
// start generating numbers
Future(generate(n)(inputChan))
// Collect answers
@tailrec def loop(input: ReadableChannel[Int]): Unit =
input.read() match
// no more numbers
case Left(_) => ()
// new prime number
case Right(n) =>
println(s"$n is prime")
// filter out multiples of n
val chan = SyncChannel[Int]()
Future(sieve(n)(input, chan))
loop(chan)
loop(inputChan)
Some notable points:
generate
andsieve
are bothusing Async
functions, they block the caller until their work has finished. We want to run them concurrently as actors, we spawn them withFuture.apply
.- Both are given channels to write into, and the permission to close them. This is a common pattern, useful when your channels are single sender, multiple readers1. In such case, it is common for the sender to signal that there are no more inputs by simply closing the channel. All readers are then immediately notified, and in this case (where readers are senders themselves in a pipeline), they can forward this signal by closing their respective channels.
in the case where multiple senders are involved, the logic is much more complicated. You might want
to look into ChannelMultiplexer
.
Sources as a Concurrency Primitive
With async functions, active Futures and channels we should be able to write our program, and make them easily reasonable with structured concurrency.
However, often we use concurrency to deal with asynchronous external events, such as file I/O, networking, sockets, ... From the program's point of view, this is completely unstructured: often you ask for an operation, and at some point in the future (out of our control), we get back a response. This does not mix well with our current concepts, where callers always manage its own execution flow and structure.
To deal with such asynchronous events, Gears provide some interfaces to be used as bridges between the unstructured world and
structured concurrency programs.
These come in the forms of Async.Source
s.
Callbacks and push
The most basic concept of requesting an asynchronous operation is through a callback.
Let's illustrate this through a very simple example, of reading a file.
object File:
def readString(path: String)(callback: Either[Error, String] => Unit): Cancellable
Suppose we have a non-blocking API of reading a file at path
. The content of the file will be read into a String
, which
is passed into callback
(or, if errors come up, they also get passed into callback
).
Rather than the operation result, readString
's caller receives a request object, which has a cancel
method to attempt
to cancel the read operation.
The flow of callback-based operations is as follows:
-
body
callsFile.readString(path)(callback)
, getting back aCancellable
immediately. -
body
does not control when or howcallback
is called, and has no provided way to interact withcallback
.In reality, should
body
want to wait forcallback
's result, it can either- Put all handling logic past
readString
intocallback
, turningcallback
into a full continuation. - Introduce blocking/suspending through some synchronization primitive (channel, mutex).
Without
Async
the latter option introduces blocking code, which effectively nullifies the advantage of using non-blocking API in the first place. Therefore, traditional languages with callbacks (Node.js is an infamous example) chose the former, creating the problem of callback hell. - Put all handling logic past
-
At some time in the future,
callback
is called with the result. This might happen whilebody
is still executing, or after. Depending on parallelism capabilities, it may be executed in parallel tobody
.
How Async
makes callbacks easier
In Gears however, we have Async
computations: computations that can suspend to wait for a future value! This means,
given the same API, we can do the following:
- Suspend the current
Async
context - Issue the asynchronous request, passing the resumption of the
Async
context as the callback.
And, in essence, this is exactly what Async.await
does!
Let's have a now fully-expanded signature of Async.await
:
trait Async:
def await[T](src: Source[T]): T
With our knowledge of the Async-callback flow, we know that src
would be some callback-taking future value.
Async.await
would then do the suspend / set-resume-callback procedure, and return the output of the src
as if we continued
from within the callback.
This suspension/resume mechanism is how Async
was able to let you await for concurrent Future
s (which is also a Source
!)
without being inside a Future
itself, monadic style.
The Source
trait
Now that we have an idea of what a Source
is, we can now look at how
Source
is defined:
trait Source[+T]:
def onComplete(listener: Listener[T]): Unit
def poll(listener: Listener[T]): Boolean
def poll(): Option[T]
def dropListener(listener: Listener[T]): Unit
inline def awaitResult(using Async): T = Async.await(this)
// Included for convenience, simplified...
trait Listener[-T]:
def complete(item: T, origin: Source[T]): Unit
-
onComplete
does exactly what you expect: when an item is available,listener
is called with the item.listener
also gets a reference to theSource
itself, in case it was passed to multipleSource
s and needs to distinguish which source the item came from. -
poll(listener)
works as a non-blocking poll: if an item is immediately available, call thelistener
with it immediately. Otherwise, immediately returnfalse
, and usually this means you'd want to put your listener intoonComplete
instead.If an item is all you need,
poll()
is a much more convenient overload: you just get the item back, orNone
if it was not immediately available. -
dropListener
signals thatlistener
is no longer needed to be called. This usually signals the cancellation of the value receiver.Note that this is not cancelling the
Source
itself:Source
does not have a concept of cancellation! Not all asynchronous operations are cancellable.
Finally, awaitResult
is just a convenience postfix method for Async.await
.
In special cases where T
is Either
or Try
(Future[T]
is a Source[Try[T]]
), we have .await
as a convenience
postfix method that also unwraps the value!
Persistent and Ephemeral Sources
So far, things seem clear when we have one Source
and one Listener
.
What happens when we pass multiple listeners into a Source
?
The answer depends a lot on what the source is representing:
-
Should we write the above
readString
function in terms ofSource
instead:object File: def readString(path: String): Source[Either[Error, String]] & Cancellable
then the
Source
returned represents a single asynchronous request. This request returns once, so no matter how many listeners are attached to it, they should get the same value every time.We call Sources where all listeners get the same value, resolved at the same time1, a persistent source. Especially, if a source is both persistent and cancellable, it is a
Future
. We shall see the "real" interface of aFuture
in the next section.// "correct" definition object File: def readString(path: String): Future[Either[Error, String]]
-
Suppose we want to capture signals sent to our program. We might use a signal handler like this
object Signals: def capture(signals: Signal*): Source[Signal]
in which our
Source
represents a subscription to the system process' signal handler. This will deliver multiple signals to our program, and each listener attached to thisSource
should get one, possibly different signal. We call Sources that deliver possibly different values to listeners an ephemeral source.Another example of an ephemeral source is
.readSource
of a channel.
Active and Reactive Sources
Activeness and Reactivity of Sources refer to whether values are resolved to the Source regardless of interactions with it (regarding adding/removing listeners).
Some sources, like an active Future
or an asynchronous request like File.readString
, will complete and
resolve with a value regardless of a listener attached to it or not.
Some other sources, like Channel.readSource
, does not do anything on creation. It only attempts to
consume a value from the channel when a listener is attached to it.
From the perspective of the Source
, this matters when one decides to create a Source
but not await for it.
There is no way to differentiate the two sources from the signature alone, hence this should be an anti-pattern.
it is hard to say that concurrent listeners are resolved "at the same time".
We usually reason about this in terms of subsequent listeners: if one listener is attached to a persistent Source
after another has been resolved, the second listener should be resolved immediately with the same value.
Consequently, poll
ing this Source
will always give the same value back immediately.
Passive Futures and Promises
As mentioned in the last section, the Future
trait represents a persistent source that is also cancellable.
trait Future[+T] extends Source[Try[T]], Cancellable
As mentioned before, Future
can be active or passive1.
We have delved deeply into active Future
s, and hence this section is all about passive Future
s.
Even passive Future
s are considered an active Source
by our definition.
So what are passive Future
s? Gears make a distinction between an active and a passive Future
based on whether
its computation is within or outside Gears' structured concurrency.
Active Future
s are actively driven by Gears, and passive Future
s can be completed from outside of Gears' machineries.
Typically, passive Future
s are used as a straightforward way to encapsulate callback-style functions into
Gears interface.
We've seen how the interface is created in the last section, we shall now see how we can provide the implementation.
Future.withResolver
: JS-style callback-to-future conversion
Here is an example on how to implement a Future
-returning version of readString
given the callback one:
object File:
def readString(path: String, callback: Either[Error, String] => Unit): Cancellable =
// provided
???
def readString(path: String): Future[Either[Error, String]] =
Future.withResolver: resolver =>
val cancellable = readString(path, result => resolver.resolve(result))
resolver.onCancel: () =>
cancellable.cancel()
resolver.rejectAsCancelled()
-
Future.withResolver
creates a passiveFuture
where completion is managed by theResolver
parameter passed intobody
.The role of
body
is to set up callbacks that will complete theFuture
by resolving it with a value or failing it with an exception.In the case that the callback function is cancellable,
body
can set up throughResolver.onCancel
to forward the cancellation of theFuture
into the callback function. -
Unlike
Future.apply
,withResolver
runs body directly, blocking the caller untilbody
is complete. It runs theFuture
afterbody
has completed setting it up.
If you are familiar with JavaScript, this is almost the same interface as its
Promise
constructor.
Converting between Scala and Gears futures
Since Scala scala.concurrent.Future
runs with their
own ExecutionContext
, it is considered outside of Gears machinary, and hence can only be converted into a passive Gears Future
.
This can be done by importing
ScalaConverters
and using the extension method .asGears
on the Future
.
Note that it requires an ExecutionContext
, in order to run the code that would complete
the Future
.
Cancelling this Future
only forces it to return Failure(CancellationException)
, as Scala Futures cannot be cancelled externally.
Converting from a Gears Future
, passive or active, to a Scala Future
is also possible through the
.asScala
extension method.
Futures created from active Gears futures will return Failure(CancellationException)
once they go out of scope.
Manual completion of Futures with Promise
In case you need a more complicated flow than a callback, Gears provide a more manual way of creating passive Future
s,
through a Future.Promise
.
Simply put, it is a Future
of which you can manually complete with a value
through a method.
Because there is no structure to Promise
, it is never recommended to return one directly. It should be upcasted
to Future
(explicitly or with .asFuture
)
before returned.
One cannot forward cancellation of a Promise
.
Select and Race
Async.select
works on both Channels and Futures because they operate on Source
s.
Its behavior generalizes to taking all Source
s, with exactly one item consumed from one of the sources,
of which the corresponding handler is run.
Async.select
machinery
This behavior is achieved by passing into the Source
s a special Listener
that would only accept the
first item resolved, and rejecting all others.
Learn more about failible in the next section.
Async.race
Under the hood, Async.select
uses a more general method to combine multiple Source
s into one
with a racing behavior.
object Async:
def race[T](sources: Source[T]*): Source[T]
race
takes multiple sources and returns a new ephemeral, reactive Source
that:
- once a listener is attached, it wraps the listener with a one-time check and forwards it to all sources.
- when one source resolves this listener with a value, invalidates this listener on all other sources.
Note that Source.dropListener
is inherently racy: one cannot reliably use dropListener
to guarantee
that a listener will not be run.
This is the main motivation to introduce failible listeners.
Async.raceWithOrigin
is
a variant of race
that also returns which of the origin sources the item comes from.
Calling listeners and Source.transformValuesWith
From the description of race
, you might notice a general pattern for transforming Source
s with a .map
-like
behavior:
- Given a transformation function
f: T => U
, one can transform aSource[T]
to aSource[U]
by creating a Source that... - Given a listener
l: Listener[U]
, create aListener[T]
that appliesf
before passing tol
... - ... and pass that wrapping listener back to
Source[T]
.
And this is exactly what
Source.transformValuesWith
does.
This looks like a very useful method, so why isn't it a basic concept, with a familiar name (.map
)?
The devil lies in the details, specifically how Source
notifies its Listener
s.
Notice that there is no ExecutionContext
or Async.Scheduler
passed into onComplete
.
This is because Source
often represent values that come from outside the program's control, possibly running
on another thread pool. We do not want to introduce a "middleground" where spawning tasks is possible but will
implicitly live outside of structured concurrency.
With this limitation, we don't have control over how parallel or resource-intensive a Source
would take
to call a Listener
.
In Gears, we attempt to constrain this limitation by asking Listener
implementations to be as resource-unintensive as possible.
For most cases, Listener
involves just scheduling the resumption of the suspended Async
context, so it is not a problem.
However, transformValuesWith
allows one to easily run unlimited computation within the transform function f
.
This is dangerous, so transformValuesWith
has a long, tedious, greppable name to show its gnar sides.
I want to transform a Source
, what do I do?
Explicitly spawn a Future that transforms each item and communicate with it through a Channel
:
val ch = SyncChannel[U]()
Future:
while true do
val t = source.awaitResult
ch.send(f(t))
This is not however not always what you want: think about whether you want f
and send
to be run at the same time,
or awaitResult
and send
, or what to do when f
throws, ...
Most of the time this is very situation-specific, so Gears do not provide a simple solution.
Locking and Failible Listeners
As mentioned in the previous section, Source.dropListener
is not a perfect solution when it comes to managing listeners:
dropListener
is run possibly at a different thread than the one processing the Source
items,
which means there is no guarantee that calling dropListener
before the listener is resolved
will cause the listener to not be resolved.
However, sometimes we do want to achieve this guarantee.
To do so, Gears allow a listener to declare itself dead, and Source
s calling a dead listener
should not be consuming the item to the listener.
How does that work in details, how do we declare a failible listener and how do we handle them?
The Listener
trait
Let's look at the Listener
trait in detail:
trait Listener[-T]:
def complete(item: T, origin: Source[T]): Unit
val lock: ListenerLock | null
trait ListenerLock:
val selfNumber: Long
def acquire(): Boolean
def release(): Unit
We've seen the complete
method before in the Source
section, which simply
forwards the item to the listener, "completing" it.
However, a Listener
may also declare itself to have a lock
,
which must be acquired before calling complete
.
Most listeners are however not failible and does not require a lock, and so in such case the Listener
just declare lock = null
.
The listener lock:
-
Returns a boolean in its
acquire
method: failing to acquire the lock means that the listener is dead, and should not be completed.- For performance reasons, they are blocking locks (no
using Async
inacquire
):Source
s should be quick in using them!
- For performance reasons, they are blocking locks (no
-
Has a
release
method: in case that theSource
no longer has the item after acquiring the lock, it can bereleased
without completing theListener
. It does not have to be run when theListener
iscomplete
d: theListener
will release the lock by itself! (See the diagram below) -
Exposes a
selfNumber
, which is required to be unique for eachListener
.This is used in the case where a
Source
needs to synchronize between multiple listeners, to prevent deadlocking.One can simply use a
NumberedLock
to implementListenerLock
with the appropriate numbering.
Source and Listener interaction
The diagram for the handshake process between a Source
and a Listener
is as follow:
However, if Source
's status for the item never changes (item is always available),
Listener.completeNow
is a shortcut handler for the entire flow, only letting you know whether the listener was alive and
received the item.
The handshake process also allows for more complex scenarios, involving synchronization of multiple handlers.
For example,
the implementation of SyncChannel
involves a sending and a receiving listener both being alive to proceed.
We shall not go into details in this guide.
Cancellables and Scoping rules
In the section about Structured Concurrency, we learned that
Async
contexts can manage linked Future
s.
Let's explore this concept in details in this section.
CompletionGroup
Every Async
context created from all sources (Async.blocking
, Async.group
and Future.apply
) creates a brand new
CompletionGroup
, which is a set of Cancellable
s.
Upon the end of the Async
context's body, this CompletionGroup
handles calling .cancel
on all its Cancellable
s
and wait for all of them to be removed from the group.
Adding a Cancellable
to a Group
is done by calling
add
on the Group
,
or calling link
on the Cancellable
.
Since every Async
context contains a CompletionGroup
,
an overload of link
exists
that would add the Cancellable
to the Async
context's group.
Async
-scoped values
If you have data that needs to be cleaned up when the Async
scope ends, implement Cancellable
and link
the data
to the Async
scope:
case class CloseCancellable(c: Closeable) extends Cancellable:
def cancel() =
c.close()
unlink()
extension (closeable: Closeable)
def link()(using Async) =
CloseCancellable(closeable).link()
However, note that this would possibly create dangling resources that links to the passed-in Async
context
of a function:
def f()(using Async) =
def resource = new Resource()
resource.link() // links to Async
def main() =
Async.blocking:
f()
// resource is still alive
g()
// ...
0
// resource is cancelled *here*
Unlinking
Just as Cancellable
s can be link
ed to a group, you can also unlink
them from their current group.
This is one way to create a dangling active Future
that is indistinguishable from a passive Future
.
(Needless to say this is very much not recommended).
To write a function that creates and returns an active Future
, write a function that takes an Async.Spawn
context:
def returnsDangling()(using Async.Spawn): Future[Int] =
Future:
longComputation()
Again, this is a pattern that breaks structured concurrency and should not be used carelessly! One should avoid exposing this pattern of functions in a Gears-using public API of a library.