Introduction to Gears
This is a book about Gears, an experimental asynchronous programming library for Scala 3!
Gears aim to provide a basis for writing cross-platform high-level asynchronous code with direct-style Scala and structured concurrency, while allowing library implementations to deal with external asynchronous IO with featureful primitives and expose a simple direct-style API.
While Gears is currently in experimental stage (definitely not recommended for production!), we provide basic support for
- Virtual-threads-enabled JVM environments (JRE 21 or later, or JRE 19 with experimental virtual threads enabled)
- Scala Native 0.5.0 or later with delimited continuations support (on Linux, MacOS and BSDs).
Adding Gears in your own Scala Project
Gears can be added to your own project by adding the following dependency, by your preferred build tool:
- With
sbt:libraryDependencies += "ch.epfl.lamp" %%% "gears" % "0.2.0", - With
mill:def ivyDeps = Agg( // ... other dependencies ivy"ch.epfl.lamp::gears::0.2.0" ) - With
scala-cli://> using dep "ch.epfl.lamp::gears::0.2.0"
Setting up gears
Running Examples in this Book
The book contains some examples in the src/scala directory.
For example, this is the hello example, located in src/scala/hello.scala:
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import gears.async.*
import gears.async.default.given
@main def main() =
Async.blocking:
val hello = Future:
print("Hello")
val world = Future:
hello.await
println(", world!")
world.await
All such examples can be run with scala-cli provided the following dependencies:
- A locally published version of
gears. - Additional platform dependencies, as described below.
Compiling gears
To compile gears from source, clone gears from the GitHub repository:
git clone https://github.com/lampepfl/gears --recursive
And run rootJVM/publishLocal.
For Scala Native
Make sure to prepare additional dependencies for Scala Native.
You would also need to locally publish a custom version of munit for Scala Native 0.5.0, which is available as a
git submodule from the gears repository.
To locally publish munit, from the root directory of gears repository:
git submodule update # Create and update submodules
dependencies/publish_deps.sh # Locally publish pinned dependencies
Finally, gears artifacts for Scala Native can be locally published by
sbt rootNative/publishLocal
Trying out examples
On JVM
As mentioned in the introduction, we require a JVM version that supports virtual threads. With that provided, examples can be run simply with
scala-cli run "path-to-example"
For the hello example:
scala-cli run src/scala/hello.scala
On Scala Native
The current Scala Native version required by Gears is 0.5.1.
Therefore examples in this book hardcodes 0.5.1 as the Scala Native version, which
is also the version gears is currently compiled against.
Examples can be run with
scala-cli --platform scala-native run "path-to-example"
For the hello example:
scala-cli --platform scala-native run src/scala/hello.scala
An "Async" function
The central concept of Gears programs are functions taking an Async context,
commonly referred to in this book as an async function.
import gears.async.*
def asyncFn(n: Int)(using Async): Int =
// ^^^^^ an async context
AsyncOperations.sleep(n * 100 /* milliseconds */)/*(using Async)*/
n + 1
We can look at the Async context from two points of view, both as a capability
and a context:
- As a Capability,
Asyncrepresents the ability to suspend the current computation, waiting for some event to arrive (such as timeouts, external I/Os or even other concurrent computations). While the current computation is suspended, we allow the runtime to use the CPU for other tasks, effectively utilizing it better (compared to just blocking the computation until the wanted event arrives). - As a Context,
Asyncholds the neccessary runtime information for the scheduler to know which computation should be suspended and how to resume them later. It also contains a concurrency scope, which we will see later in the structured concurrency section.
However, different from other languages with a concept of Async functions, gears's async functions are
just normal functions with an implicit Async parameter!
This means we can also explicitly take the parameter as opposed to using Async:
def alsoAsyncFn(async: Async): Int =
asyncFn(10)(using async)
and do anything with the Async context as if it was a variable1!
Passing on Async contexts
Let's look at a more fully-fledged example, src/scala/counting.scala:
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import gears.async.*
import gears.async.default.given
/** Counts to [[n]], sleeping for 100milliseconds in between. */
def countTo(n: Int)(using Async): Unit =
(1 to n).foreach: i =>
AsyncOperations.sleep(100 /* milliseconds */ ) /*(using Async)*/
println(s"counted $i")
@main def Counting() =
Async.blocking: /* (async: Async) ?=> */
countTo(10)
println("Finished counting!")
(if you see //> directive on examples, it means the example can be run self-contained!)
Let's look at a few interesting points in this example:
-
Async.blocking:Async.blockinggives you a "root"Asynccontext, given implementations of the supporting layer (neatly provided byimport gears.async.default.given!).This defines a "scope", not too different from
scala.util.Usingorscala.util.boundary. As usual, the context provided within the scope should not escape it. Likescala.util.boundary'sboundary.Label, theAsynccontext will be passed implicitly to other functions takingAsynccontext. Note thatAsync.blockingis the only provided mean of creating anAsynccontext out of "nothing", so it is easy to keep track of when adoptinggearsinto your codebase! -
countToreturnsUnitinstead ofFuture[Unit]or an equivalent. This indicates that *callingcountTowill "block" the caller untilcountTohas returned with a value. Noawaitneeded, andcountTobehaves just like a normal function!Of course, with the
Asynccontext passed,countTois allowed to perform operations that suspends, such assleep(which suspends theAsynccontext for the given amount of milliseconds).This illustrates an important concept in Gears: in most common cases, we write functions that accepts a suspendable context and calling them will block until they return! While it is completely normal to spawn concurrent/parallel computations and join/race them, as we will see in the next chapter, "natural" APIs written with Gears should have the same control flow as non-
Asyncfunctions. -
Within
countTo, note that we callsleepunder a function passed intoSeq.foreach, effectively capturingAsyncwithin the function. This is completely fine:foreachruns the given function in the sameAsynccontext (not outside nor in a sub-context), and does not capture the function. Our capability and scoping rules is maintained, andforeachisAsync-capable by default!While this illustrates the above fact, we could've just written the function in a familiar fashion with
forcomprehension:/** Counts to [[n]], sleeping for 100milliseconds in between. */ def countTo(n: Int)(using Async): Unit = for i <- 1 to n do AsyncOperations.sleep(100.millis) println(s"counted $i")
That's all for now with the Async context. Next chapter, we will properly introduce concurrency to our model.
Aside: Pervasive Async problem?
At this point, if you've done asynchronous programming with JavaScript, Rust or C# before, you might wonder if Gears is offering a solution that comes with the What color is your function? problem.
It is true that the Async context divides the function space into ones requiring it and ones that don't,
and generally you need an Async context to call an async function2.
However:
-
Writing async-polymorphic higher order functions is trivial in Gears: should you not be caring about
Asynccontexts when taking in function arguments (() => TandAsync ?=> Tboth works), simply take() => TandAsync-aware blocks will inherit the context from the caller!One obvious example is
Seq.foreachfrom above. In fact, all current Scala collection API should still work with no changes. Of course, if applied in repeat the function will be run sequentially rather than concurrently, but that is the expected behavior ofSeq.foreach. -
The precense of an
Asynccontext helps explicitly declaring that a certain function requires runtime support for suspension, as well as the precense of a concurrent scope (for cancellation purposes). Ultimately, that means the compiler does not have to be pessimistic about compiling all functions in a suspendable way (a la Go), hurting both performance and interopability (especially with C on Scala Native).For the user, is also a clear indication that calling the function will suspend to wait for (in most cases) external events (IO, filesystem, sleep, ...), and should prepare accordingly. Likewise, libraries using Gears should not be performing such tasks if they don't take an
Asynccontext.
With that in mind, it is useful (as with all languages with async functions) to treat Async like a capability,
only pass them in functions handling async operations, and try to isolate business logic into functions that don't.
While in principle this is possible, capability and scoping rules apply to the Async context: functions taking
Async capabilities should not capture it in a way that stays longer than the function's body execution.
In the future, capture checking should be able to find such violations and report them during the compilation process.
Technically Async.blocking can be used to call any async function without an async context,
you should be aware of its pitfalls. That said, if you are migrating from a synchronous codebase,
they are functional bridges between migrated and in-progress parts of the codebase.
Concurrency with Future
Future
is the primary source of concurrency in a Gears program.
There are actually two kinds of futures: passive and active. However,
we shall look at active futures for now, and come back with passive futures
in the next chapter.
Spawning a concurrent computation with Future.apply
Future.apply
takes a body of the type Async.Spawn ?=> T (that is, an async computation)
and runs it completely concurrently to the current computation.
Future.apply returns immediately with a value of type Future[T].
Eliding details (that we shall come back next chapter), there are two things you can do with this Future[T]:
-
Await: calling
.awaiton theFuture[T]suspends the current Async context until theFuture's body returns, and give you back the returned valueT. Ifbodythrows an exception,.awaitwill also throw as well..awaitResultis the alternative where both cases are wrapped inside aTry.Being a suspension point,
.awaitrequires an Async context. -
Cancel: calling
.cancel()on theFuture[T]sends a cancellation signal to theFuture. This would causebody's execution to receiveCancellationExceptionon the next suspension point, and cancel all of thebody's spawned Futures and so on, in a cascading fashion.After cancelling,
.await-ing theFuturewill raise aCancellationException.
An example
We can have a look at one simple example, implementing my favorite sorting algorithm, sleepsort!
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
import scala.collection.mutable
import scala.concurrent.duration.*
import gears.async.*
import gears.async.default.given
@main def sleepSort() =
Async.blocking: /* (spawn: Async.Spawn) ?=> */
val origin = Seq(50, 80, 10, 60, 40, 100)
// Spawn sleeping futures!
val buf = mutable.ArrayBuffer[Int]()
origin
.map: n =>
Future /*(using spawn)*/: /* (Async) ?=> */
AsyncOperations.sleep(n.millis)
buf.synchronized:
buf += n
.awaitAll
println(buf)
Let's walk through what's happening here:
- Starting from a
Seq[Int], we.mapthe elements each to create aFuture[Unit]. CallingFuture:gives us a newAsynccontext, passed into theFuture's body. ThisAsynccontext inherits the suspension implementation fromAsync.blocking's context, and has a sub-scope withAsync.blocking's context as its parent. We will talk about scoping in the next section. - In each
Future, wesleepfor the amount of milliseconds the same as the element's value itself. Note thatsleepwould suspend theAsynccontext given by theFuture, i.e. the future's body, but not the one running underAsync.blocking. Hence, it is totally okay to have thousands ofFutures thatsleep! .mapgives us back aSeq[Future[Unit]]immediately, which we can wait for all futures to complete with the extension method [.awaitAll]. This suspendsAsync.blockingcontext until all futures run to completion, and gives us backSeq[Unit]. The return value is not interesting though, what we care about is thebufat the end.
Async.Spawn
If you noticed, Async.blocking gives you an Async.Spawn context
(rather than just Async), which Future.apply
requires. What is it?
Recall that Async, as a capability, gives you the ability to suspend the computation to wait for other values.
Async.Spawn adds a new capability on top: it allows you to spawn concurrent computations as well.
Getting a Spawn capability is actually very simple. You get an Async.Spawn capability on Async.blocking by default,
and both Future.apply and Async.group gives you an Async.Spawn capability sub-scoped from a parent Async scope.
Note that, however, most functions do not take an Async.Spawn context by default.
This is due to the nature of Spawn's capability to spawn computations that runs as long as the Spawn scope is alive, which typically corresponds to the lexical scope of the function.
If functions take Async.Spawn, they are allowed to spawn futures that are still computing even after the function itself returns!
def fn()(using Async.Spawn) =
val f = Future:
useCPUResources()
0
Async.blocking:
val n = fn()
// f is still running at this point!!!
This is way more difficult to reason about, so the "sane" behavior is to not take Async.Spawn by default, and only
"upgrading" it (with Async.group) inside a scope that will take care of cleaning up these "dangling" futures on return.
Most of Gear's library are structured this way, and we strongly recommend you to do the same in your function designs.
Groups, Scoping and Structured Concurrency
If you've been following the book thus far, you've seen a lot of mentions of
"an Async scope" and "sub-scoping". What exactly are those "scopes", and why
do they matter?
That's exactly what this section is about!
tl;dr: Gears Structured Concurrency
We can sum up the content of this section as below. For more details, read on!
Asynccontexts form a tree, rooted byAsync.blocking, each node associated by abodycomputation. AllAsynccontexts only returns afterbodycompletes.- A child
Asynccontext can be created by eitherAsync.group, creating a direct child context "blocking" its caller, grantingAsync.Spawn.Future.apply, branching a concurrent computation linked to the currentAsynccontext.
- When
bodycompletes, all linkedFutures are cancelled, and theAsynccontext returns after the cancellation is completely handled.
With these three properties, we guarantee that calling using Async functions will properly suspend if needed,
and cleans up properly on return.
In Gears, Async scopes form a tree
Let's look at the only three1 introduction points for an Async context in gears.async, and examine their signatures:
-
Async.blocking's signature isobject Async: def blocking[T](body: Async.Spawn ?=> T)(using AsyncSupport, Scheduler): TJust as the name suggest,
Async.blockinggives you anAsynccontext from thin air, but in exchange, whatever suspending computation happening inbodycausesblockingto block. Hence,blockingonly returns whenbodygives back aT, whichblockinghappily forwards back to the caller.Looking at it from the perspective of the call stack,
blockinglooks like this...
Pretty simple, isn't it? With blocking, you run suspending code in a scope that cleans up itself, just like returning from a function! -
Async.group's signature isobject Async: def group[T](inner_body: Async.Spawn ?=> T)(using Async): TAsync.grouptakes anAsyncscope, and starts a child scope under it. This scope inherits the suspend and scheduling implementation of the parentAsync, but gives theinner_bodya scope of its own...
It is actually almost the exact same as Async.blockingfrom the perspective of the caller (with anAsynccontext!)Async.groupalso "blocks" (suspending if possible, of course) untilinner_bodycompletes.So far so good, and our scope-stack is still a linear stack! Let's introduce concurrency...
-
Finally,
Future.apply's signature isobject Future: def apply[T](future_body: Async.Spawn ?=> T)(using Async.Spawn): TFuture.applystarts a concurrent computation that is bound to the givenAsync.Spawnscope. What does that mean? Looking at the call stack...
We can see that Future.applyimmediately returns (tobody) a reference to theFuture. In the meantime, a newAsync.Spawnscope (namedFuture.applyhere) is created, runningfuture_body. It is linked to the originalAsync.Spawnpassed intoFuture.apply.We can now see that
Asyncscopes, with both its associatedbodyand linkedFuture.apply'sAsyncscopes, now forms a tree!
To illustrate what exactly happens with linked scopes as the program runs, let's walk through the cases of interactions between the scopes!
Technically Task.start
also introduces an Async context, but it is exactly equivalent to Future.apply(Task.run()).
Interactions between scopes
.awaiting a Future
When you .await a future, the Async scope used to .await on that Future will suspend the computation until the
Future returns a value (i.e. its future_body is complete).

Note that the Async context passed into .await does not have to be the same Async.Spawn context that the Future
is linked to!
However, for reasons we'll see shortly, active Futures will always be linked to one of the ancestors of body's Async context
(or the same one).
Another thing to note is that, upon completion, the Future unlinks itself from the Async.Spawn scope.
This is not very important, as the Future already has a return value! But it is a good detail to notice when it comes to cleaning up.
Cleaning up: after body completes
What happens when body of an Async context completes, while some linked Future is still running?

In short: When body completes, all linked child scopes are .cancel()-ed, and Async will suspend until no more
child scopes are linked before returning.
This is especially important!
It means that, futures that are never .awaited will get cancelled. Make sure to .await futures you want to complete!

Cleaning up: when cancel is signalled
During a Future's computation, if it is .cancel-ed mid-way, the body keeps running (as there is no way to suddenly
interrupt it without data loss!)
However, if .await is called with the Future's Async context (i.e. a request to suspend), it will immediately
throw with a CancellationException.
The body may catch and react to this exception, e.g. to clean up resources.
The Future context will only attempt to unlink after body has returned, whether normally or by unwinding an exception.

Delaying cancellation with uninterruptible
The above .await behavior (throwing CancellationException) can be delayed within a Future by running the code under an
uninterruptible scope.
This is useful for asynchronous clean-up:
Future:
val resource = acquireResource()
try
useResource(resource)
finally
Async.uninterruptble:
resource.cleanup()/*(using Async)*/
without uninterruptble, suspensions done by resource.cleanup will again throw CancellationExceptions.
Working with multiple Futures
While writing code that performs computation concurrently, it is common to spawn multiple futures and then await for them at the same time, with some particular logic.
Gears provides some tools to deal with multiple futures, both for a statically known number and
unknown number of futures (i.e. if you have a Seq[Future[T]]).
Combining two futures
Future.zip
takes two futures Future[A] and Future[B] and returns a Future[(A, B)],
completing when both input futures do.
Async.blocking:
val a = Future(1)
val b = Future("one")
val (va, vb) = a.zip(b).await // (1, "one")
if one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without suspending until the other completes).
Future.or
takes two Future[T]s and returns another Future[T], completing with the first value
that one of the two futures succeeds with.
If both throws, throw the exception that was thrown last.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.hour)
2
val c = Future:
throw Exception("explode!")
a.or(b).await // returns 1 immediately
b.or(a).await // returns 1 immediately
c.or(a).await // returns 1 immediately
c.or(c).await // throws
a.orWithCancel(b).await // returns 1 immediately, and b is cancelled
Future.orWithCancel
is a variant of Future.or where the slower Future is cancelled.
Combining a Sequence of futures
Seq[Future[T]].awaitAll
takes a Seq[Future[T]] and return a Seq[T] when all futures in the input Seq
are completed.
The results are in the same order corresponding to the Futures of the original Seq.
If one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without
suspending until the other completes).
It is a more performant version of futures.fold(_.zip _).await.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.second)
2
val c = Future:
throw Exception("explode!")
Seq(a, b).awaitAll // Seq(1, 2), suspends for a second
Seq(a, b, c).awaitAll // throws immediately
awaitAllOrCancel
is a variant where all futures are cancelled when one of them throws.
Seq[Future[T]].awaitFirst
takes a Seq[Future[T]] and return a T when the first future in the input Seq succeeds.
If all input futures throw an exception, the last exception is re-thrown.
It is a more performant version of futures.fold(_.or _).await.
Async.blocking:
val a = Future(1)
val b = Future:
AsyncOperations.sleep(1.second)
2
val c = Future:
throw Exception("explode!")
Seq(a, b).awaitFirst // returns 1 immediately
Seq(a, b, c).awaitFirst // returns 1 immediately
awaitFirstWithCancel
is a variant where all other futures are cancelled when one succeeds.
Async.select
Async.select
works similar to awaitFirst, but allows you to attach a body to handle the returned value, so the futures
being raced does not have to be of the same type (as long as the handler returns the same type).
Compared to creating more futures, Async.select also guarantees that exactly one of the handlers
is run, so you can be more confident putting side-effects into the handlers.
Async.blocking:
val a = Future(1)
val b = Future("one")
val v = Async.select(
a.handle: va =>
s"number $va was returned",
b.handle: vb =>
s"string `$vb` was returned"
)
println(v) // either "number 1 was returned", or "string `one` was returned"
Async.select takes SelectCase* varargs, so you can also .map a sequence of Future as well.
Async.blocking:
val futs = (1 to 10).map(Future.apply)
val number = Async.select(futs.map(_.handle: v => s"$v returned")*)
Future.Collector
Future.Collector
takes a sequence of Future and exposes
a ReadableChannel1
returning the Futures as they are completed.
Collector allows you to manually implement handling of multiple futures as they arrive, should you need
more complex behaviors than the ones provided.
For example, here is a simplified version of .awaitFirst:
def awaitFirst[T](futs: Seq[Future[T]])(using Async): T =
val len = futs.length
val collector = Future.Collector(futs*)
@tailrec def loop(failed: Int): T =
val fut = collector.results.read()
fut.awaitResult match
case Success(value) => value
case Failure(exc) =>
if failed + 1 == len then throw exc
else loop(failed + 1)
loop(0)
MutableCollector is a Collector
that also lets you add more
Futures after creation.
learn more about channels in a future chapter.
Supervising with Retries and Timeouts
With the ability to sleep non-blockingly, one might immediately think about implementing supervision of tasks with retrying behavior upon failure.
Gears provide some building blocks, as well as interfaces to customize the retry policies.
Timeout
The most simple supervision function is withTimeout,
which takes a duration and an async block (Async ?=> T). It runs the async block racing with the timeout, and
throws TimeoutException if the timeout runs out before the async block, cancelling the
async block afterwards.
Async.blocking:
withTimeout(60.seconds):
val body = request.get("https://google.com")
// ...
withTimeoutOption
is a variant that wraps the output in Option[T] instead, returning None on timeout.
Retry
Similar to withTimeout, Retry
takes a block and executes it, "blocking" the caller until its policy has finished.
Retry allows you to specify:
- when to consider the block as complete (until success, until failure, never)
- how many consecutive failures should fail the policy
- how much to delay in between attempts through the
Delaytrait- Some delay policies are provided, such as a constant delay and an exponential backoff algorithm.
Async.blocking:
Retry
.untilSuccess
.withMaximumFailures(5)
.withDelay(Delay.exponentialBackoff(maximum = 1.minute, starting = 1.second, jitter = Jitter.full)):
val body = request.get("https://google.com")
// ...
Inter-future communication with Channels
For immutable data, it is easy and recommended to directly
capture references to them from within Futures. However, to share mutable data
or to communicate between concurrent computations, Gears recommend following
Effective Go's slogan on sharing:
Do not communicate by sharing memory; instead, share memory by communicating.
To facillitate this, Gears provides channels, a simple mean of pipe-style communication between computations.
Channels
Channel can be thought
of as a pipe between senders and readers, with or without a buffer in between. The latter
differentiates between the types of channels provided by Gears.
Nevertheless, all channels provide the following methods:
-
sendtakes a valueTand attempt to send it through the channel. Suspends until the value has been received or stored in the channel's buffer (if one exists).If the channel is closed,
sendwill throwChannelClosedException. -
readsuspends until a value is available through the channel, consuming it from the channel.readreturnsEither[Closed, T], withLeft(Closed)returned in the obvious case. -
closesimply closes the channel.
Channels provide the following guarantees:
- Two sequential
sends (onesendfollowed by anothersend, not done concurrently) will bereadin the same order. - An item that is
sendis alwaysreadexactly once, unless if it was in the channel's buffer when the channel is closed.
Types of channels
Gears provide 3 types of channels:
-
SyncChannelare channels without a buffer, so everysendwill suspend until a correspondingreadsucceeds.Since there are no buffers, an item that is
sendis alwaysreadexactly once. -
BufferedChannelare channels with a buffer of a fixed size.sends will succeed immediately if there is space in the buffer, and suspend otherwise, until there is space for it.When
cancelled, items in the buffer are dropped. -
UnboundedChannelare channels with a buffer that is infinitely growable.sends always succeed immediately. In fact,UnboundedChannelprovidessendImmediatelythat never suspends.When
cancelled, items in the buffer are dropped.
Restricted channel interfaces
When writing functions taking in channels, it is often useful to restrict them to be read-only or send-only.
Gears Channel trait is composed of
ReadableChannel,
SendableChannel and
Closeable, so any channel can be upcasted to one of the above three interfaces.
Async.select with channels
Async.select can be used with channels. To do so, use
.readSource for reading and
.sendSource(T) for sending; before
attaching .handle and providing a handler.
Async.blocking:
val fut = Future:
AsyncOperations.sleep(1.minute)
10
val ch = SyncChannel[Int]()
val readFut = Future:
ch.read().right.get + 1
Async.select(
fut.handle: x => println(s"fut returned $x"),
ch.sendSource(20).handle:
case Left(Closed) => throw Exception("channel closed!?")
case Right(()) =>
println(s"sent from channel!")
println(s"readFut returned ${readFut.await}")
)
Similar to future handlers, Async.select guarantees that exactly one of the handlers are run.
Not only so, it also guarantees that only the channel event with the handler that is run will go through!
For example, in the following snippet, it guarantees that exactly one channel is consumed from for each loop:
val chans = (1 to 2).map: i =>
val chan = UnboundedChannel()
chan.sendImmediately(i)
chan
for i <- 1 to 2 do
val read = Async.select(
chans(0).readSource.handle: v => v,
chans(1).readSource.handle: v => v,
)
println(read) // prints 1 2 or 2 1
It is possible to mix reads and sends within one Async.select, or mix channel operations with futures!
Channel in Use: Actor pattern
One of the common patterns of using channels is to write functions that continuously take inputs from one channel, process them, and send their output into another channel. This is sometimes referred to as an Actor, or a Communicating Sequential Process.
Let's look at one of the examples, inspired by Rob Pike's talk on concurrency patterns in Go. This is an implementation of a concurrent prime sieve.
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
//> using nativeMode "release-full"
import java.io.Closeable
import scala.annotation.tailrec
import scala.util.boundary
import gears.async.*
import gears.async.default.given
// Sender that owns the channel
type OwnedSender[T] = SendableChannel[T] & Closeable
// Send the sequence 2, 3, 4, ..., until and then close the channel
def generate(until: Int)(ch: OwnedSender[Int])(using Async) =
for i <- 2 to until do ch.send(i)
ch.close()
// Filter out multiples of k
def sieve(
k: Int
)(in: ReadableChannel[Int], out: OwnedSender[Int])(using Async) =
@tailrec def loop(): Unit =
in.read() match
case Left(_) => ()
case Right(n) =>
if n % k != 0 then out.send(n)
loop()
loop()
out.close()
@main def PrimeSieve(n: Int) =
Async.blocking:
// set up sieves
val inputChan = SyncChannel[Int]()
// start generating numbers
Future(generate(n)(inputChan))
// Collect answers
@tailrec def loop(input: ReadableChannel[Int]): Unit =
input.read() match
// no more numbers
case Left(_) => ()
// new prime number
case Right(n) =>
println(s"$n is prime")
// filter out multiples of n
val chan = SyncChannel[Int]()
Future(sieve(n)(input, chan))
loop(chan)
loop(inputChan)
Some notable points:
generateandsieveare bothusing Asyncfunctions, they block the caller until their work has finished. We want to run them concurrently as actors, we spawn them withFuture.apply.- Both are given channels to write into, and the permission to close them. This is a common pattern, useful when your channels are single sender, multiple readers1. In such case, it is common for the sender to signal that there are no more inputs by simply closing the channel. All readers are then immediately notified, and in this case (where readers are senders themselves in a pipeline), they can forward this signal by closing their respective channels.
in the case where multiple senders are involved, the logic is much more complicated. You might want
to look into ChannelMultiplexer.
Sources as a Concurrency Primitive
With async functions, active Futures and channels we should be able to write our program, and make them easily reasonable with structured concurrency.
However, often we use concurrency to deal with asynchronous external events, such as file I/O, networking, sockets, ... From the program's point of view, this is completely unstructured: often you ask for an operation, and at some point in the future (out of our control), we get back a response. This does not mix well with our current concepts, where callers always manage its own execution flow and structure.

To deal with such asynchronous events, Gears provide some interfaces to be used as bridges between the unstructured world and
structured concurrency programs.
These come in the forms of Async.Sources.
Callbacks and push
The most basic concept of requesting an asynchronous operation is through a callback.
Let's illustrate this through a very simple example, of reading a file.
object File:
def readString(path: String)(callback: Either[Error, String] => Unit): Cancellable
Suppose we have a non-blocking API of reading a file at path. The content of the file will be read into a String, which
is passed into callback (or, if errors come up, they also get passed into callback).
Rather than the operation result, readString's caller receives a request object, which has a cancel method to attempt
to cancel the read operation.

The flow of callback-based operations is as follows:
-
bodycallsFile.readString(path)(callback), getting back aCancellableimmediately. -
bodydoes not control when or howcallbackis called, and has no provided way to interact withcallback.In reality, should
bodywant to wait forcallback's result, it can either- Put all handling logic past
readStringintocallback, turningcallbackinto a full continuation. - Introduce blocking/suspending through some synchronization primitive (channel, mutex).
Without
Asyncthe latter option introduces blocking code, which effectively nullifies the advantage of using non-blocking API in the first place. Therefore, traditional languages with callbacks (Node.js is an infamous example) chose the former, creating the problem of callback hell. - Put all handling logic past
-
At some time in the future,
callbackis called with the result. This might happen whilebodyis still executing, or after. Depending on parallelism capabilities, it may be executed in parallel tobody.
How Async makes callbacks easier
In Gears however, we have Async computations: computations that can suspend to wait for a future value! This means,
given the same API, we can do the following:
- Suspend the current
Asynccontext - Issue the asynchronous request, passing the resumption of the
Asynccontext as the callback.

And, in essence, this is exactly what Async.await does!
Let's have a now fully-expanded signature of Async.await:
trait Async:
def await[T](src: Source[T]): T
With our knowledge of the Async-callback flow, we know that src would be some callback-taking future value.
Async.await would then do the suspend / set-resume-callback procedure, and return the output of the src as if we continued
from within the callback.
This suspension/resume mechanism is how Async was able to let you await for concurrent Futures (which is also a Source!)
without being inside a Future itself, monadic style.
The Source trait
Now that we have an idea of what a Source is, we can now look at how
Source is defined:
trait Source[+T]:
def onComplete(listener: Listener[T]): Unit
def poll(listener: Listener[T]): Boolean
def poll(): Option[T]
def dropListener(listener: Listener[T]): Unit
inline def awaitResult(using Async): T = Async.await(this)
// Included for convenience, simplified...
trait Listener[-T]:
def complete(item: T, origin: Source[T]): Unit
-
onCompletedoes exactly what you expect: when an item is available,listeneris called with the item.listeneralso gets a reference to theSourceitself, in case it was passed to multipleSources and needs to distinguish which source the item came from. -
poll(listener)works as a non-blocking poll: if an item is immediately available, call thelistenerwith it immediately. Otherwise, immediately returnfalse, and usually this means you'd want to put your listener intoonCompleteinstead.If an item is all you need,
poll()is a much more convenient overload: you just get the item back, orNoneif it was not immediately available. -
dropListenersignals thatlisteneris no longer needed to be called. This usually signals the cancellation of the value receiver.Note that this is not cancelling the
Sourceitself:Sourcedoes not have a concept of cancellation! Not all asynchronous operations are cancellable.
Finally, awaitResult is just a convenience postfix method for Async.await.
In special cases where T is Either or Try (Future[T] is a Source[Try[T]]), we have .await as a convenience
postfix method that also unwraps the value!
Persistent and Ephemeral Sources
So far, things seem clear when we have one Source and one Listener.
What happens when we pass multiple listeners into a Source?
The answer depends a lot on what the source is representing:
-
Should we write the above
readStringfunction in terms ofSourceinstead:object File: def readString(path: String): Source[Either[Error, String]] & Cancellablethen the
Sourcereturned represents a single asynchronous request. This request returns once, so no matter how many listeners are attached to it, they should get the same value every time.We call Sources where all listeners get the same value, resolved at the same time1, a persistent source. Especially, if a source is both persistent and cancellable, it is a
Future. We shall see the "real" interface of aFuturein the next section.// "correct" definition object File: def readString(path: String): Future[Either[Error, String]] -
Suppose we want to capture signals sent to our program. We might use a signal handler like this
object Signals: def capture(signals: Signal*): Source[Signal]in which our
Sourcerepresents a subscription to the system process' signal handler. This will deliver multiple signals to our program, and each listener attached to thisSourceshould get one, possibly different signal. We call Sources that deliver possibly different values to listeners an ephemeral source.Another example of an ephemeral source is
.readSourceof a channel.
Active and Reactive Sources
Activeness and Reactivity of Sources refer to whether values are resolved to the Source regardless of interactions with it (regarding adding/removing listeners).
Some sources, like an active Future or an asynchronous request like File.readString, will complete and
resolve with a value regardless of a listener attached to it or not.
Some other sources, like Channel.readSource, does not do anything on creation. It only attempts to
consume a value from the channel when a listener is attached to it.
From the perspective of the Source, this matters when one decides to create a Source but not await for it.
There is no way to differentiate the two sources from the signature alone, hence this should be an anti-pattern.
it is hard to say that concurrent listeners are resolved "at the same time".
We usually reason about this in terms of subsequent listeners: if one listener is attached to a persistent Source
after another has been resolved, the second listener should be resolved immediately with the same value.
Consequently, polling this Source will always give the same value back immediately.
Passive Futures and Promises
As mentioned in the last section, the Future trait represents a persistent source that is also cancellable.
trait Future[+T] extends Source[Try[T]], Cancellable
As mentioned before, Future can be active or passive1.
We have delved deeply into active Futures, and hence this section is all about passive Futures.
Even passive Futures are considered an active Source by our definition.
So what are passive Futures? Gears make a distinction between an active and a passive Future based on whether
its computation is within or outside Gears' structured concurrency.
Active Futures are actively driven by Gears, and passive Futures can be completed from outside of Gears' machineries.
Typically, passive Futures are used as a straightforward way to encapsulate callback-style functions into
Gears interface.
We've seen how the interface is created in the last section, we shall now see how we can provide the implementation.
Future.withResolver: JS-style callback-to-future conversion
Here is an example on how to implement a Future-returning version of readString given the callback one:
object File:
def readString(path: String, callback: Either[Error, String] => Unit): Cancellable =
// provided
???
def readString(path: String): Future[Either[Error, String]] =
Future.withResolver: resolver =>
val cancellable = readString(path, result => resolver.resolve(result))
resolver.onCancel: () =>
cancellable.cancel()
resolver.rejectAsCancelled()
-
Future.withResolvercreates a passiveFuturewhere completion is managed by theResolverparameter passed intobody.The role of
bodyis to set up callbacks that will complete theFutureby resolving it with a value or failing it with an exception.In the case that the callback function is cancellable,
bodycan set up throughResolver.onCancelto forward the cancellation of theFutureinto the callback function. -
Unlike
Future.apply,withResolverruns body directly, blocking the caller untilbodyis complete. It runs theFutureafterbodyhas completed setting it up.
If you are familiar with JavaScript, this is almost the same interface as its
Promise constructor.
Converting between Scala and Gears futures
Since Scala scala.concurrent.Future runs with their
own ExecutionContext, it is considered outside of Gears machinary, and hence can only be converted into a passive Gears Future.
This can be done by importing
ScalaConverters
and using the extension method .asGears on the Future.
Note that it requires an ExecutionContext, in order to run the code that would complete the Future.
Cancelling this Future only forces it to return Failure(CancellationException), as Scala Futures cannot be cancelled externally.
Converting from a Gears Future, passive or active, to a Scala Future is also possible through the
.asScala
extension method.
Futures created from active Gears futures will return Failure(CancellationException) once they go out of scope.
Manual completion of Futures with Promise
In case you need a more complicated flow than a callback, Gears provide a more manual way of creating passive Futures,
through a Future.Promise.
Simply put, it is a Future of which you can manually complete with a value
through a method.
Because there is no structure to Promise, it is never recommended to return one directly. It should be upcasted
to Future (explicitly or with .asFuture)
before returned.
One cannot forward cancellation of a Promise.
Select and Race
Async.select works on both Channels and Futures because they operate on Sources.
Its behavior generalizes to taking all Sources, with exactly one item consumed from one of the sources,
of which the corresponding handler is run.
Async.select machinery
This behavior is achieved by passing into the Sources a special Listener that would only accept the
first item resolved, and rejecting all others.
Learn more about failible in the next section.
Async.race
Under the hood, Async.select uses a more general method to combine multiple Sources into one
with a racing behavior.
object Async:
def race[T](sources: Source[T]*): Source[T]
race takes multiple sources and returns a new ephemeral, reactive Source that:
- once a listener is attached, it wraps the listener with a one-time check and forwards it to all sources.
- when one source resolves this listener with a value, invalidates this listener on all other sources.
Note that Source.dropListener is inherently racy: one cannot reliably use dropListener to guarantee
that a listener will not be run.
This is the main motivation to introduce failible listeners.
Async.raceWithOrigin is
a variant of race that also returns which of the origin sources the item comes from.
Calling listeners and Source.transformValuesWith
From the description of race, you might notice a general pattern for transforming Sources with a .map-like
behavior:
- Given a transformation function
f: T => U, one can transform aSource[T]to aSource[U]by creating a Source that... - Given a listener
l: Listener[U], create aListener[T]that appliesfbefore passing tol... - ... and pass that wrapping listener back to
Source[T].
And this is exactly what
Source.transformValuesWith does.
This looks like a very useful method, so why isn't it a basic concept, with a familiar name (.map)?
The devil lies in the details, specifically how Source notifies its Listeners.
Notice that there is no ExecutionContext or Async.Scheduler passed into onComplete.
This is because Source often represent values that come from outside the program's control, possibly running
on another thread pool. We do not want to introduce a "middleground" where spawning tasks is possible but will
implicitly live outside of structured concurrency.
With this limitation, we don't have control over how parallel or resource-intensive a Source would take
to call a Listener.
In Gears, we attempt to constrain this limitation by asking Listener implementations to be as resource-unintensive as possible.
For most cases, Listener involves just scheduling the resumption of the suspended Async context, so it is not a problem.
However, transformValuesWith allows one to easily run unlimited computation within the transform function f.
This is dangerous, so transformValuesWith has a long, tedious, greppable name to show its gnar sides.
I want to transform a Source, what do I do?
Explicitly spawn a Future that transforms each item and communicate with it through a Channel:
val ch = SyncChannel[U]()
Future:
while true do
val t = source.awaitResult
ch.send(f(t))
This is not however not always what you want: think about whether you want f and send to be run at the same time,
or awaitResult and send, or what to do when f throws, ...
Most of the time this is very situation-specific, so Gears do not provide a simple solution.
Locking and Failible Listeners
As mentioned in the previous section, Source.dropListener
is not a perfect solution when it comes to managing listeners:
dropListener is run possibly at a different thread than the one processing the Source items,
which means there is no guarantee that calling dropListener before the listener is resolved
will cause the listener to not be resolved.
However, sometimes we do want to achieve this guarantee.
To do so, Gears allow a listener to declare itself dead, and Sources calling a dead listener
should not be consuming the item to the listener.
How does that work in details, how do we declare a failible listener and how do we handle them?
The Listener trait
Let's look at the Listener trait in detail:
trait Listener[-T]:
def complete(item: T, origin: Source[T]): Unit
val lock: ListenerLock | null
trait ListenerLock:
val selfNumber: Long
def acquire(): Boolean
def release(): Unit
We've seen the complete method before in the Source section, which simply
forwards the item to the listener, "completing" it.
However, a Listener may also declare itself to have a lock,
which must be acquired before calling complete.
Most listeners are however not failible and does not require a lock, and so in such case the Listener
just declare lock = null.
The listener lock:
-
Returns a boolean in its
acquiremethod: failing to acquire the lock means that the listener is dead, and should not be completed.- For performance reasons, they are blocking locks (no
using Asyncinacquire):Sources should be quick in using them!
- For performance reasons, they are blocking locks (no
-
Has a
releasemethod: in case that theSourceno longer has the item after acquiring the lock, it can bereleasedwithout completing theListener. It does not have to be run when theListeneriscompleted: theListenerwill release the lock by itself! (See the diagram below) -
Exposes a
selfNumber, which is required to be unique for eachListener.This is used in the case where a
Sourceneeds to synchronize between multiple listeners, to prevent deadlocking.One can simply use a
NumberedLockto implementListenerLockwith the appropriate numbering.
Source and Listener interaction
The diagram for the handshake process between a Source and a Listener is as follow:

However, if Source's status for the item never changes (item is always available),
Listener.completeNow
is a shortcut handler for the entire flow, only letting you know whether the listener was alive and
received the item.
The handshake process also allows for more complex scenarios, involving synchronization of multiple handlers.
For example,
the implementation of SyncChannel
involves a sending and a receiving listener both being alive to proceed.
We shall not go into details in this guide.
Cancellables and Scoping rules
In the section about Structured Concurrency, we learned that
Async contexts can manage linked Futures.
Let's explore this concept in details in this section.
CompletionGroup
Every Async context created from all sources (Async.blocking, Async.group and Future.apply) creates a brand new
CompletionGroup, which is a set of Cancellables.
Upon the end of the Async context's body, this CompletionGroup handles calling .cancel on all its Cancellables
and wait for all of them to be removed from the group.
Adding a Cancellable to a Group is done by calling
add on the Group,
or calling link
on the Cancellable.
Since every Async context contains a CompletionGroup,
an overload of link exists
that would add the Cancellable to the Async context's group.
Async-scoped values
If you have data that needs to be cleaned up when the Async scope ends, implement Cancellable and link the data
to the Async scope:
case class CloseCancellable(c: Closeable) extends Cancellable:
def cancel() =
c.close()
unlink()
extension (closeable: Closeable)
def link()(using Async) =
CloseCancellable(closeable).link()
However, note that this would possibly create dangling resources that links to the passed-in Async context
of a function:
def f()(using Async) =
def resource = new Resource()
resource.link() // links to Async
def main() =
Async.blocking:
f()
// resource is still alive
g()
// ...
0
// resource is cancelled *here*
Unlinking
Just as Cancellables can be linked to a group, you can also unlink them from their current group.
This is one way to create a dangling active Future that is indistinguishable from a passive Future.
(Needless to say this is very much not recommended).
To write a function that creates and returns an active Future, write a function that takes an Async.Spawn context:
def returnsDangling()(using Async.Spawn): Future[Int] =
Future:
longComputation()
Again, this is a pattern that breaks structured concurrency and should not be used carelessly! One should avoid exposing this pattern of functions in a Gears-using public API of a library.