Introduction to Gears

This is a book about Gears, an experimental asynchronous programming library for Scala 3!

Gears aim to provide a basis for writing cross-platform high-level asynchronous code with direct-style Scala and structured concurrency, while allowing library implementations to deal with external asynchronous IO with featureful primitives and expose a simple direct-style API.

While Gears is currently in experimental stage (definitely not recommended for production!), we provide basic support for

  • Virtual-threads-enabled JVM environments (JRE 21 or later, or JRE 19 with experimental virtual threads enabled)
  • Scala Native 0.5.0 or later with delimited continuations support (on Linux, MacOS and BSDs).

Adding Gears in your own Scala Project

Gears can be added to your own project by adding the following dependency, by your preferred build tool:

  • With sbt:
    libraryDependencies += "ch.epfl.lamp" %%% "gears" % "0.2.0",
    
  • With mill:
    def ivyDeps = Agg(
      // ... other dependencies
      ivy"ch.epfl.lamp::gears::0.2.0"
    )
    
  • With scala-cli:
    //> using dep "ch.epfl.lamp::gears::0.2.0"
    

Setting up gears

Running Examples in this Book

The book contains some examples in the src/scala directory. For example, this is the hello example, located in src/scala/hello.scala:

//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"

import gears.async.*
import gears.async.default.given

@main def main() =
  Async.blocking:
    val hello = Future:
      print("Hello")
    val world = Future:
      hello.await
      println(", world!")
    world.await

All such examples can be run with scala-cli provided the following dependencies:

  • A locally published version of gears.
  • Additional platform dependencies, as described below.

Compiling gears

To compile gears from source, clone gears from the GitHub repository:

git clone https://github.com/lampepfl/gears --recursive

And run rootJVM/publishLocal.

For Scala Native

Make sure to prepare additional dependencies for Scala Native. You would also need to locally publish a custom version of munit for Scala Native 0.5.0, which is available as a git submodule from the gears repository.

To locally publish munit, from the root directory of gears repository:

git submodule update            # Create and update submodules
dependencies/publish_deps.sh # Locally publish pinned dependencies

Finally, gears artifacts for Scala Native can be locally published by

sbt rootNative/publishLocal

Trying out examples

On JVM

As mentioned in the introduction, we require a JVM version that supports virtual threads. With that provided, examples can be run simply with

scala-cli run "path-to-example"

For the hello example:

scala-cli run src/scala/hello.scala

On Scala Native

The current Scala Native version required by Gears is 0.5.1. Therefore examples in this book hardcodes 0.5.1 as the Scala Native version, which is also the version gears is currently compiled against.

Examples can be run with

scala-cli --platform scala-native run "path-to-example"

For the hello example:

scala-cli --platform scala-native run src/scala/hello.scala

An "Async" function

The central concept of Gears programs are functions taking an Async context, commonly referred to in this book as an async function.

import gears.async.*

def asyncFn(n: Int)(using Async): Int =
                       // ^^^^^ an async context
    AsyncOperations.sleep(n * 100 /* milliseconds */)/*(using Async)*/
    n + 1

We can look at the Async context from two points of view, both as a capability and a context:

  • As a Capability, Async represents the ability to suspend the current computation, waiting for some event to arrive (such as timeouts, external I/Os or even other concurrent computations). While the current computation is suspended, we allow the runtime to use the CPU for other tasks, effectively utilizing it better (compared to just blocking the computation until the wanted event arrives).
  • As a Context, Async holds the neccessary runtime information for the scheduler to know which computation should be suspended and how to resume them later. It also contains a concurrency scope, which we will see later in the structured concurrency section.

However, different from other languages with a concept of Async functions, gears's async functions are just normal functions with an implicit Async parameter! This means we can also explicitly take the parameter as opposed to using Async:

def alsoAsyncFn(async: Async): Int =
    asyncFn(10)(using async)

and do anything with the Async context as if it was a variable1!

Passing on Async contexts

Let's look at a more fully-fledged example, src/scala/counting.scala:

//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"

import gears.async.*
import gears.async.default.given

/** Counts to [[n]], sleeping for 100milliseconds in between. */
def countTo(n: Int)(using Async): Unit =
  (1 to n).foreach: i =>
    AsyncOperations.sleep(100 /* milliseconds */ ) /*(using Async)*/
    println(s"counted $i")

@main def Counting() =
  Async.blocking: /* (async: Async) ?=> */
    countTo(10)
    println("Finished counting!")

(if you see //> directive on examples, it means the example can be run self-contained!)

Let's look at a few interesting points in this example:

  1. Async.blocking: Async.blocking gives you a "root" Async context, given implementations of the supporting layer (neatly provided by import gears.async.default.given!).

    This defines a "scope", not too different from scala.util.Using or scala.util.boundary. As usual, the context provided within the scope should not escape it. Like scala.util.boundary's boundary.Label, the Async context will be passed implicitly to other functions taking Async context. Note that Async.blocking is the only provided mean of creating an Async context out of "nothing", so it is easy to keep track of when adopting gears into your codebase!

  2. countTo returns Unit instead of Future[Unit] or an equivalent. This indicates that *calling countTo will "block" the caller until countTo has returned with a value. No await needed, and countTo behaves just like a normal function!

    Of course, with the Async context passed, countTo is allowed to perform operations that suspends, such as sleep (which suspends the Async context for the given amount of milliseconds).

    This illustrates an important concept in Gears: in most common cases, we write functions that accepts a suspendable context and calling them will block until they return! While it is completely normal to spawn concurrent/parallel computations and join/race them, as we will see in the next chapter, "natural" APIs written with Gears should have the same control flow as non-Async functions.

  3. Within countTo, note that we call sleep under a function passed into Seq.foreach, effectively capturing Async within the function. This is completely fine: foreach runs the given function in the same Async context (not outside nor in a sub-context), and does not capture the function. Our capability and scoping rules is maintained, and foreach is Async-capable by default!

    While this illustrates the above fact, we could've just written the function in a familiar fashion with for comprehension:

    /** Counts to [[n]], sleeping for 100milliseconds in between. */
    def countTo(n: Int)(using Async): Unit =
      for i <- 1 to n do
        AsyncOperations.sleep(100.millis)
        println(s"counted $i")
    

That's all for now with the Async context. Next chapter, we will properly introduce concurrency to our model.

Aside: Pervasive Async problem?

At this point, if you've done asynchronous programming with JavaScript, Rust or C# before, you might wonder if Gears is offering a solution that comes with the What color is your function? problem.

It is true that the Async context divides the function space into ones requiring it and ones that don't, and generally you need an Async context to call an async function2. However:

  • Writing async-polymorphic higher order functions is trivial in Gears: should you not be caring about Async contexts when taking in function arguments (() => T and Async ?=> T both works), simply take () => T and Async-aware blocks will inherit the context from the caller!

    One obvious example is Seq.foreach from above. In fact, all current Scala collection API should still work with no changes. Of course, if applied in repeat the function will be run sequentially rather than concurrently, but that is the expected behavior of Seq.foreach.

  • The precense of an Async context helps explicitly declaring that a certain function requires runtime support for suspension, as well as the precense of a concurrent scope (for cancellation purposes). Ultimately, that means the compiler does not have to be pessimistic about compiling all functions in a suspendable way (a la Go), hurting both performance and interopability (especially with C on Scala Native).

    For the user, is also a clear indication that calling the function will suspend to wait for (in most cases) external events (IO, filesystem, sleep, ...), and should prepare accordingly. Likewise, libraries using Gears should not be performing such tasks if they don't take an Async context.

With that in mind, it is useful (as with all languages with async functions) to treat Async like a capability, only pass them in functions handling async operations, and try to isolate business logic into functions that don't.

1

While in principle this is possible, capability and scoping rules apply to the Async context: functions taking Async capabilities should not capture it in a way that stays longer than the function's body execution. In the future, capture checking should be able to find such violations and report them during the compilation process.

2

Technically Async.blocking can be used to call any async function without an async context, you should be aware of its pitfalls. That said, if you are migrating from a synchronous codebase, they are functional bridges between migrated and in-progress parts of the codebase.

Concurrency with Future

Future is the primary source of concurrency in a Gears program. There are actually two kinds of futures: passive and active. However, we shall look at active futures for now, and come back with passive futures in the next chapter.

Spawning a concurrent computation with Future.apply

Future.apply takes a body of the type Async.Spawn ?=> T (that is, an async computation) and runs it completely concurrently to the current computation. Future.apply returns immediately with a value of type Future[T]. Eliding details (that we shall come back next chapter), there are two things you can do with this Future[T]:

  • Await: calling .await on the Future[T] suspends the current Async context until the Future's body returns, and give you back the returned value T. If body throws an exception, .await will also throw as well. .awaitResult is the alternative where both cases are wrapped inside a Try.

    Being a suspension point, .await requires an Async context.

  • Cancel: calling .cancel() on the Future[T] sends a cancellation signal to the Future. This would cause body's execution to receive CancellationException on the next suspension point, and cancel all of the body's spawned Futures and so on, in a cascading fashion.

    After cancelling, .await-ing the Future will raise a CancellationException.

An example

We can have a look at one simple example, implementing my favorite sorting algorithm, sleepsort!

//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"

import scala.collection.mutable
import scala.concurrent.duration.*
import gears.async.*
import gears.async.default.given

@main def sleepSort() =
  Async.blocking: /* (spawn: Async.Spawn) ?=> */
    val origin = Seq(50, 80, 10, 60, 40, 100)
    // Spawn sleeping futures!
    val buf = mutable.ArrayBuffer[Int]()
    origin
      .map: n =>
        Future /*(using spawn)*/: /* (Async) ?=> */
          AsyncOperations.sleep(n.millis)
          buf.synchronized:
            buf += n
      .awaitAll
    println(buf)

Let's walk through what's happening here:

  1. Starting from a Seq[Int], we .map the elements each to create a Future[Unit]. Calling Future: gives us a new Async context, passed into the Future's body. This Async context inherits the suspension implementation from Async.blocking's context, and has a sub-scope with Async.blocking's context as its parent. We will talk about scoping in the next section.
  2. In each Future, we sleep for the amount of milliseconds the same as the element's value itself. Note that sleep would suspend the Async context given by the Future, i.e. the future's body, but not the one running under Async.blocking. Hence, it is totally okay to have thousands of Futures that sleep!
  3. .map gives us back a Seq[Future[Unit]] immediately, which we can wait for all futures to complete with the extension method [.awaitAll]. This suspends Async.blocking context until all futures run to completion, and gives us back Seq[Unit]. The return value is not interesting though, what we care about is the buf at the end.

Async.Spawn

If you noticed, Async.blocking gives you an Async.Spawn context (rather than just Async), which Future.apply requires. What is it?

Recall that Async, as a capability, gives you the ability to suspend the computation to wait for other values. Async.Spawn adds a new capability on top: it allows you to spawn concurrent computations as well.

Getting a Spawn capability is actually very simple. You get an Async.Spawn capability on Async.blocking by default, and both Future.apply and Async.group gives you an Async.Spawn capability sub-scoped from a parent Async scope. Note that, however, most functions do not take an Async.Spawn context by default. This is due to the nature of Spawn's capability to spawn computations that runs as long as the Spawn scope is alive, which typically corresponds to the lexical scope of the function. If functions take Async.Spawn, they are allowed to spawn futures that are still computing even after the function itself returns!

def fn()(using Async.Spawn) =
    val f = Future:
        useCPUResources()
    0

Async.blocking:
    val n = fn()
    // f is still running at this point!!!

This is way more difficult to reason about, so the "sane" behavior is to not take Async.Spawn by default, and only "upgrading" it (with Async.group) inside a scope that will take care of cleaning up these "dangling" futures on return.

Most of Gear's library are structured this way, and we strongly recommend you to do the same in your function designs.

Groups, Scoping and Structured Concurrency

If you've been following the book thus far, you've seen a lot of mentions of "an Async scope" and "sub-scoping". What exactly are those "scopes", and why do they matter?

That's exactly what this section is about!

tl;dr: Gears Structured Concurrency

We can sum up the content of this section as below. For more details, read on!

  1. Async contexts form a tree, rooted by Async.blocking, each node associated by a body computation. All Async contexts only returns after body completes.
  2. A child Async context can be created by either
    • Async.group, creating a direct child context "blocking" its caller, granting Async.Spawn.
    • Future.apply, branching a concurrent computation linked to the current Async context.
  3. When body completes, all linked Futures are cancelled, and the Async context returns after the cancellation is completely handled.

With these three properties, we guarantee that calling using Async functions will properly suspend if needed, and cleans up properly on return.

In Gears, Async scopes form a tree

Let's look at the only three1 introduction points for an Async context in gears.async, and examine their signatures:

  1. Async.blocking's signature is

    object Async:
      def blocking[T](body: Async.Spawn ?=> T)(using AsyncSupport, Scheduler): T
    

    Just as the name suggest, Async.blocking gives you an Async context from thin air, but in exchange, whatever suspending computation happening in body causes blocking to block. Hence, blocking only returns when body gives back a T, which blocking happily forwards back to the caller.

    Looking at it from the perspective of the call stack, blocking looks like this... Scope with Blocking Pretty simple, isn't it? With blocking, you run suspending code in a scope that cleans up itself, just like returning from a function!

  2. Async.group's signature is

    object Async:
      def group[T](inner_body: Async.Spawn ?=> T)(using Async): T
    

    Async.group takes an Async scope, and starts a child scope under it. This scope inherits the suspend and scheduling implementation of the parent Async, but gives the inner_body a scope of its own... Scope with Group It is actually almost the exact same as Async.blocking from the perspective of the caller (with an Async context!) Async.group also "blocks" (suspending if possible, of course) until inner_body completes.

    So far so good, and our scope-stack is still a linear stack! Let's introduce concurrency...

  3. Finally, Future.apply's signature is

    object Future:
      def apply[T](future_body: Async.Spawn ?=> T)(using Async.Spawn): T
    

    Future.apply starts a concurrent computation that is bound to the given Async.Spawn scope. What does that mean? Looking at the call stack... Scope with Future spawn We can see that Future.apply immediately returns (to body) a reference to the Future. In the meantime, a new Async.Spawn scope (named Future.apply here) is created, running future_body. It is linked to the original Async.Spawn passed into Future.apply.

    We can now see that Async scopes, with both its associated body and linked Future.apply's Async scopes, now forms a tree!

To illustrate what exactly happens with linked scopes as the program runs, let's walk through the cases of interactions between the scopes!

1

Technically Task.start also introduces an Async context, but it is exactly equivalent to Future.apply(Task.run()).

Interactions between scopes

.awaiting a Future

When you .await a future, the Async scope used to .await on that Future will suspend the computation until the Future returns a value (i.e. its future_body is complete).

Scoping when awaiting a Future

Note that the Async context passed into .await does not have to be the same Async.Spawn context that the Future is linked to! However, for reasons we'll see shortly, active Futures will always be linked to one of the ancestors of body's Async context (or the same one).

Another thing to note is that, upon completion, the Future unlinks itself from the Async.Spawn scope. This is not very important, as the Future already has a return value! But it is a good detail to notice when it comes to cleaning up.

Cleaning up: after body completes

What happens when body of an Async context completes, while some linked Future is still running?

Context when body returns but Future is running

In short: When body completes, all linked child scopes are .cancel()-ed, and Async will suspend until no more child scopes are linked before returning.

This is especially important! It means that, futures that are never .awaited will get cancelled. Make sure to .await futures you want to complete!

Context when Scope cancels

Cleaning up: when cancel is signalled

During a Future's computation, if it is .cancel-ed mid-way, the body keeps running (as there is no way to suddenly interrupt it without data loss!)

However, if .await is called with the Future's Async context (i.e. a request to suspend), it will immediately throw with a CancellationException. The body may catch and react to this exception, e.g. to clean up resources.

The Future context will only attempt to unlink after body has returned, whether normally or by unwinding an exception.

Context when Scope cancels and its body awaits

Delaying cancellation with uninterruptible

The above .await behavior (throwing CancellationException) can be delayed within a Future by running the code under an uninterruptible scope. This is useful for asynchronous clean-up:

Future:
  val resource = acquireResource()
  try
    useResource(resource)
  finally
    Async.uninterruptble:
      resource.cleanup()/*(using Async)*/

without uninterruptble, suspensions done by resource.cleanup will again throw CancellationExceptions.

Working with multiple Futures

While writing code that performs computation concurrently, it is common to spawn multiple futures and then await for them at the same time, with some particular logic.

Gears provides some tools to deal with multiple futures, both for a statically known number and unknown number of futures (i.e. if you have a Seq[Future[T]]).

Combining two futures

Future.zip takes two futures Future[A] and Future[B] and returns a Future[(A, B)], completing when both input futures do.

Async.blocking:
  val a = Future(1)
  val b = Future("one")
  val (va, vb) = a.zip(b).await // (1, "one")

if one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without suspending until the other completes).

Future.or takes two Future[T]s and returns another Future[T], completing with the first value that one of the two futures succeeds with. If both throws, throw the exception that was thrown last.

Async.blocking:
  val a = Future(1)
  val b = Future:
    AsyncOperations.sleep(1.hour)
    2
  val c = Future:
    throw Exception("explode!")

  a.or(b).await // returns 1 immediately
  b.or(a).await // returns 1 immediately
  c.or(a).await // returns 1 immediately
  c.or(c).await // throws

  a.orWithCancel(b).await // returns 1 immediately, and b is cancelled

Future.orWithCancel is a variant of Future.or where the slower Future is cancelled.

Combining a Sequence of futures

Seq[Future[T]].awaitAll takes a Seq[Future[T]] and return a Seq[T] when all futures in the input Seq are completed. The results are in the same order corresponding to the Futures of the original Seq. If one of the input futures throw an exception, it gets re-thrown as soon as possible (i.e. without suspending until the other completes).

It is a more performant version of futures.fold(_.zip _).await.

Async.blocking:
    val a = Future(1)
    val b = Future:
      AsyncOperations.sleep(1.second)
      2
    val c = Future:
      throw Exception("explode!")

    Seq(a, b).awaitAll // Seq(1, 2), suspends for a second
    Seq(a, b, c).awaitAll // throws immediately

awaitAllOrCancel is a variant where all futures are cancelled when one of them throws.

Seq[Future[T]].awaitFirst takes a Seq[Future[T]] and return a T when the first future in the input Seq succeeds. If all input futures throw an exception, the last exception is re-thrown.

It is a more performant version of futures.fold(_.or _).await.

Async.blocking:
    val a = Future(1)
    val b = Future:
      AsyncOperations.sleep(1.second)
      2
    val c = Future:
      throw Exception("explode!")

    Seq(a, b).awaitFirst    // returns 1 immediately
    Seq(a, b, c).awaitFirst // returns 1 immediately

awaitFirstWithCancel is a variant where all other futures are cancelled when one succeeds.

Async.select

Async.select works similar to awaitFirst, but allows you to attach a body to handle the returned value, so the futures being raced does not have to be of the same type (as long as the handler returns the same type).

Compared to creating more futures, Async.select also guarantees that exactly one of the handlers is run, so you can be more confident putting side-effects into the handlers.

Async.blocking:
  val a = Future(1)
  val b = Future("one")

  val v = Async.select(
    a.handle: va =>
      s"number $va was returned",
    b.handle: vb =>
      s"string `$vb` was returned"
  )

  println(v) // either "number 1 was returned", or "string `one` was returned"

Async.select takes SelectCase* varargs, so you can also .map a sequence of Future as well.

Async.blocking:
  val futs = (1 to 10).map(Future.apply)
  val number = Async.select(futs.map(_.handle: v => s"$v returned")*)

Future.Collector

Future.Collector takes a sequence of Future and exposes a ReadableChannel1 returning the Futures as they are completed.

Collector allows you to manually implement handling of multiple futures as they arrive, should you need more complex behaviors than the ones provided.

For example, here is a simplified version of .awaitFirst:

def awaitFirst[T](futs: Seq[Future[T]])(using Async): T =
  val len = futs.length
  val collector = Future.Collector(futs*)
  @tailrec def loop(failed: Int): T =
    val fut = collector.results.read()
    fut.awaitResult match
      case Success(value) => value
      case Failure(exc) =>
        if failed + 1 == len then throw exc
        else loop(failed + 1)
  loop(0)

MutableCollector is a Collector that also lets you add more Futures after creation.

1

learn more about channels in a future chapter.

Supervising with Retries and Timeouts

With the ability to sleep non-blockingly, one might immediately think about implementing supervision of tasks with retrying behavior upon failure.

Gears provide some building blocks, as well as interfaces to customize the retry policies.

Timeout

The most simple supervision function is withTimeout, which takes a duration and an async block (Async ?=> T). It runs the async block racing with the timeout, and throws TimeoutException if the timeout runs out before the async block, cancelling the async block afterwards.

Async.blocking:
  withTimeout(60.seconds):
    val body = request.get("https://google.com")
    // ...

withTimeoutOption is a variant that wraps the output in Option[T] instead, returning None on timeout.

Retry

Similar to withTimeout, Retry takes a block and executes it, "blocking" the caller until its policy has finished.

Retry allows you to specify:

  • when to consider the block as complete (until success, until failure, never)
  • how many consecutive failures should fail the policy
  • how much to delay in between attempts through the Delay trait
    • Some delay policies are provided, such as a constant delay and an exponential backoff algorithm.
Async.blocking:
  Retry
    .untilSuccess
    .withMaximumFailures(5)
    .withDelay(Delay.exponentialBackoff(maximum = 1.minute, starting = 1.second, jitter = Jitter.full)):
      val body = request.get("https://google.com")
      // ...

Inter-future communication with Channels

For immutable data, it is easy and recommended to directly capture references to them from within Futures. However, to share mutable data or to communicate between concurrent computations, Gears recommend following Effective Go's slogan on sharing:

Do not communicate by sharing memory; instead, share memory by communicating.

To facillitate this, Gears provides channels, a simple mean of pipe-style communication between computations.

Channels

Channel can be thought of as a pipe between senders and readers, with or without a buffer in between. The latter differentiates between the types of channels provided by Gears. Nevertheless, all channels provide the following methods:

  • send takes a value T and attempt to send it through the channel. Suspends until the value has been received or stored in the channel's buffer (if one exists).

    If the channel is closed, send will throw ChannelClosedException.

  • read suspends until a value is available through the channel, consuming it from the channel.

    read returns Either[Closed, T], with Left(Closed) returned in the obvious case.

  • close simply closes the channel.

Channels provide the following guarantees:

  • Two sequential sends (one send followed by another send, not done concurrently) will be read in the same order.
  • An item that is send is always read exactly once, unless if it was in the channel's buffer when the channel is closed.

Types of channels

Gears provide 3 types of channels:

  • SyncChannel are channels without a buffer, so every send will suspend until a corresponding read succeeds.

    Since there are no buffers, an item that is send is always read exactly once.

  • BufferedChannel are channels with a buffer of a fixed size. sends will succeed immediately if there is space in the buffer, and suspend otherwise, until there is space for it.

    When cancelled, items in the buffer are dropped.

  • UnboundedChannel are channels with a buffer that is infinitely growable. sends always succeed immediately. In fact, UnboundedChannel provides sendImmediately that never suspends.

    When cancelled, items in the buffer are dropped.

Restricted channel interfaces

When writing functions taking in channels, it is often useful to restrict them to be read-only or send-only. Gears Channel trait is composed of ReadableChannel, SendableChannel and Closeable, so any channel can be upcasted to one of the above three interfaces.

Async.select with channels

Async.select can be used with channels. To do so, use .readSource for reading and .sendSource(T) for sending; before attaching .handle and providing a handler.

Async.blocking:
  val fut = Future:
    AsyncOperations.sleep(1.minute)
    10
  val ch = SyncChannel[Int]()
  val readFut = Future:
    ch.read().right.get + 1

  Async.select(
    fut.handle: x => println(s"fut returned $x"),
    ch.sendSource(20).handle:
      case Left(Closed) => throw Exception("channel closed!?")
      case Right(()) =>
        println(s"sent from channel!")
        println(s"readFut returned ${readFut.await}")
  )

Similar to future handlers, Async.select guarantees that exactly one of the handlers are run. Not only so, it also guarantees that only the channel event with the handler that is run will go through!

For example, in the following snippet, it guarantees that exactly one channel is consumed from for each loop:

val chans = (1 to 2).map: i =>
  val chan = UnboundedChannel()
  chan.sendImmediately(i)
  chan

for i <- 1 to 2 do
  val read = Async.select(
    chans(0).readSource.handle: v => v,
    chans(1).readSource.handle: v => v,
  )
  println(read) // prints 1 2 or 2 1

It is possible to mix reads and sends within one Async.select, or mix channel operations with futures!

Channel in Use: Actor pattern

One of the common patterns of using channels is to write functions that continuously take inputs from one channel, process them, and send their output into another channel. This is sometimes referred to as an Actor, or a Communicating Sequential Process.

Let's look at one of the examples, inspired by Rob Pike's talk on concurrency patterns in Go. This is an implementation of a concurrent prime sieve.

//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
//> using nativeMode "release-full"

import java.io.Closeable
import scala.annotation.tailrec
import scala.util.boundary
import gears.async.*
import gears.async.default.given

// Sender that owns the channel
type OwnedSender[T] = SendableChannel[T] & Closeable

// Send the sequence 2, 3, 4, ..., until and then close the channel
def generate(until: Int)(ch: OwnedSender[Int])(using Async) =
  for i <- 2 to until do ch.send(i)
  ch.close()

// Filter out multiples of k
def sieve(
    k: Int
)(in: ReadableChannel[Int], out: OwnedSender[Int])(using Async) =
  @tailrec def loop(): Unit =
    in.read() match
      case Left(_) => ()
      case Right(n) =>
        if n % k != 0 then out.send(n)
        loop()
  loop()
  out.close()

@main def PrimeSieve(n: Int) =
  Async.blocking:
    // set up sieves
    val inputChan = SyncChannel[Int]()
    // start generating numbers
    Future(generate(n)(inputChan))

    // Collect answers
    @tailrec def loop(input: ReadableChannel[Int]): Unit =
      input.read() match
        // no more numbers
        case Left(_) => ()
        // new prime number
        case Right(n) =>
          println(s"$n is prime")
          // filter out multiples of n
          val chan = SyncChannel[Int]()
          Future(sieve(n)(input, chan))

          loop(chan)

    loop(inputChan)

Some notable points:

  • generate and sieve are both using Async functions, they block the caller until their work has finished. We want to run them concurrently as actors, we spawn them with Future.apply.
  • Both are given channels to write into, and the permission to close them. This is a common pattern, useful when your channels are single sender, multiple readers1. In such case, it is common for the sender to signal that there are no more inputs by simply closing the channel. All readers are then immediately notified, and in this case (where readers are senders themselves in a pipeline), they can forward this signal by closing their respective channels.
1

in the case where multiple senders are involved, the logic is much more complicated. You might want to look into ChannelMultiplexer.

Sources as a Concurrency Primitive

With async functions, active Futures and channels we should be able to write our program, and make them easily reasonable with structured concurrency.

However, often we use concurrency to deal with asynchronous external events, such as file I/O, networking, sockets, ... From the program's point of view, this is completely unstructured: often you ask for an operation, and at some point in the future (out of our control), we get back a response. This does not mix well with our current concepts, where callers always manage its own execution flow and structure.

Unstructured concurrency!

To deal with such asynchronous events, Gears provide some interfaces to be used as bridges between the unstructured world and structured concurrency programs. These come in the forms of Async.Sources.

Callbacks and push

The most basic concept of requesting an asynchronous operation is through a callback.

Let's illustrate this through a very simple example, of reading a file.

object File:
  def readString(path: String)(callback: Either[Error, String] => Unit): Cancellable

Suppose we have a non-blocking API of reading a file at path. The content of the file will be read into a String, which is passed into callback (or, if errors come up, they also get passed into callback). Rather than the operation result, readString's caller receives a request object, which has a cancel method to attempt to cancel the read operation.

Flow of callback

The flow of callback-based operations is as follows:

  1. body calls File.readString(path)(callback), getting back a Cancellable immediately.

  2. body does not control when or how callback is called, and has no provided way to interact with callback.

    In reality, should body want to wait for callback's result, it can either

    • Put all handling logic past readString into callback, turning callback into a full continuation.
    • Introduce blocking/suspending through some synchronization primitive (channel, mutex).

    Without Async the latter option introduces blocking code, which effectively nullifies the advantage of using non-blocking API in the first place. Therefore, traditional languages with callbacks (Node.js is an infamous example) chose the former, creating the problem of callback hell.

  3. At some time in the future, callback is called with the result. This might happen while body is still executing, or after. Depending on parallelism capabilities, it may be executed in parallel to body.

How Async makes callbacks easier

In Gears however, we have Async computations: computations that can suspend to wait for a future value! This means, given the same API, we can do the following:

  • Suspend the current Async context
  • Issue the asynchronous request, passing the resumption of the Async context as the callback.

Flow of callback with Async

And, in essence, this is exactly what Async.await does!

Let's have a now fully-expanded signature of Async.await:

trait Async:
  def await[T](src: Source[T]): T

With our knowledge of the Async-callback flow, we know that src would be some callback-taking future value. Async.await would then do the suspend / set-resume-callback procedure, and return the output of the src as if we continued from within the callback.

This suspension/resume mechanism is how Async was able to let you await for concurrent Futures (which is also a Source!) without being inside a Future itself, monadic style.

The Source trait

Now that we have an idea of what a Source is, we can now look at how Source is defined:

trait Source[+T]:
  def onComplete(listener: Listener[T]): Unit

  def poll(listener: Listener[T]): Boolean
  def poll(): Option[T]

  def dropListener(listener: Listener[T]): Unit

  inline def awaitResult(using Async): T = Async.await(this)

// Included for convenience, simplified...
trait Listener[-T]:
  def complete(item: T, origin: Source[T]): Unit
  • onComplete does exactly what you expect: when an item is available, listener is called with the item. listener also gets a reference to the Source itself, in case it was passed to multiple Sources and needs to distinguish which source the item came from.

  • poll(listener) works as a non-blocking poll: if an item is immediately available, call the listener with it immediately. Otherwise, immediately return false, and usually this means you'd want to put your listener into onComplete instead.

    If an item is all you need, poll() is a much more convenient overload: you just get the item back, or None if it was not immediately available.

  • dropListener signals that listener is no longer needed to be called. This usually signals the cancellation of the value receiver.

    Note that this is not cancelling the Source itself: Source does not have a concept of cancellation! Not all asynchronous operations are cancellable.

Finally, awaitResult is just a convenience postfix method for Async.await. In special cases where T is Either or Try (Future[T] is a Source[Try[T]]), we have .await as a convenience postfix method that also unwraps the value!

Persistent and Ephemeral Sources

So far, things seem clear when we have one Source and one Listener. What happens when we pass multiple listeners into a Source?

The answer depends a lot on what the source is representing:

  • Should we write the above readString function in terms of Source instead:

    object File:
      def readString(path: String): Source[Either[Error, String]] & Cancellable
    

    then the Source returned represents a single asynchronous request. This request returns once, so no matter how many listeners are attached to it, they should get the same value every time.

    We call Sources where all listeners get the same value, resolved at the same time1, a persistent source. Especially, if a source is both persistent and cancellable, it is a Future. We shall see the "real" interface of a Future in the next section.

    // "correct" definition
    object File:
      def readString(path: String): Future[Either[Error, String]]
    
  • Suppose we want to capture signals sent to our program. We might use a signal handler like this

    object Signals:
      def capture(signals: Signal*): Source[Signal]
    

    in which our Source represents a subscription to the system process' signal handler. This will deliver multiple signals to our program, and each listener attached to this Source should get one, possibly different signal. We call Sources that deliver possibly different values to listeners an ephemeral source.

    Another example of an ephemeral source is .readSource of a channel.

Active and Reactive Sources

Activeness and Reactivity of Sources refer to whether values are resolved to the Source regardless of interactions with it (regarding adding/removing listeners).

Some sources, like an active Future or an asynchronous request like File.readString, will complete and resolve with a value regardless of a listener attached to it or not.

Some other sources, like Channel.readSource, does not do anything on creation. It only attempts to consume a value from the channel when a listener is attached to it.

From the perspective of the Source, this matters when one decides to create a Source but not await for it. There is no way to differentiate the two sources from the signature alone, hence this should be an anti-pattern.

1

it is hard to say that concurrent listeners are resolved "at the same time". We usually reason about this in terms of subsequent listeners: if one listener is attached to a persistent Source after another has been resolved, the second listener should be resolved immediately with the same value. Consequently, polling this Source will always give the same value back immediately.

Passive Futures and Promises

As mentioned in the last section, the Future trait represents a persistent source that is also cancellable.

trait Future[+T] extends Source[Try[T]], Cancellable

As mentioned before, Future can be active or passive1. We have delved deeply into active Futures, and hence this section is all about passive Futures.

1

Even passive Futures are considered an active Source by our definition.

So what are passive Futures? Gears make a distinction between an active and a passive Future based on whether its computation is within or outside Gears' structured concurrency. Active Futures are actively driven by Gears, and passive Futures can be completed from outside of Gears' machineries.

Typically, passive Futures are used as a straightforward way to encapsulate callback-style functions into Gears interface. We've seen how the interface is created in the last section, we shall now see how we can provide the implementation.

Future.withResolver: JS-style callback-to-future conversion

Here is an example on how to implement a Future-returning version of readString given the callback one:

object File:
  def readString(path: String, callback: Either[Error, String] => Unit): Cancellable =
    // provided
    ???

  def readString(path: String): Future[Either[Error, String]] =
    Future.withResolver: resolver =>
      val cancellable = readString(path, result => resolver.resolve(result))
      resolver.onCancel: () =>
        cancellable.cancel()
        resolver.rejectAsCancelled()
  • Future.withResolver creates a passive Future where completion is managed by the Resolver parameter passed into body.

    The role of body is to set up callbacks that will complete the Future by resolving it with a value or failing it with an exception.

    In the case that the callback function is cancellable, body can set up through Resolver.onCancel to forward the cancellation of the Future into the callback function.

  • Unlike Future.apply, withResolver runs body directly, blocking the caller until body is complete. It runs the Future after body has completed setting it up.

If you are familiar with JavaScript, this is almost the same interface as its Promise constructor.

Converting between Scala and Gears futures

Since Scala scala.concurrent.Future runs with their own ExecutionContext, it is considered outside of Gears machinary, and hence can only be converted into a passive Gears Future.

This can be done by importing ScalaConverters and using the extension method .asGears on the Future.

Note that it requires an ExecutionContext, in order to run the code that would complete the Future. Cancelling this Future only forces it to return Failure(CancellationException), as Scala Futures cannot be cancelled externally.

Converting from a Gears Future, passive or active, to a Scala Future is also possible through the .asScala extension method. Futures created from active Gears futures will return Failure(CancellationException) once they go out of scope.

Manual completion of Futures with Promise

In case you need a more complicated flow than a callback, Gears provide a more manual way of creating passive Futures, through a Future.Promise.

Simply put, it is a Future of which you can manually complete with a value through a method.

Because there is no structure to Promise, it is never recommended to return one directly. It should be upcasted to Future (explicitly or with .asFuture) before returned.

One cannot forward cancellation of a Promise.

Select and Race

Async.select works on both Channels and Futures because they operate on Sources. Its behavior generalizes to taking all Sources, with exactly one item consumed from one of the sources, of which the corresponding handler is run.

Async.select machinery

This behavior is achieved by passing into the Sources a special Listener that would only accept the first item resolved, and rejecting all others.

Learn more about failible in the next section.

Async.race

Under the hood, Async.select uses a more general method to combine multiple Sources into one with a racing behavior.

object Async:
  def race[T](sources: Source[T]*): Source[T]

race takes multiple sources and returns a new ephemeral, reactive Source that:

  • once a listener is attached, it wraps the listener with a one-time check and forwards it to all sources.
  • when one source resolves this listener with a value, invalidates this listener on all other sources.

Note that Source.dropListener is inherently racy: one cannot reliably use dropListener to guarantee that a listener will not be run. This is the main motivation to introduce failible listeners.

Async.raceWithOrigin is a variant of race that also returns which of the origin sources the item comes from.

Calling listeners and Source.transformValuesWith

From the description of race, you might notice a general pattern for transforming Sources with a .map-like behavior:

  • Given a transformation function f: T => U, one can transform a Source[T] to a Source[U] by creating a Source that...
  • Given a listener l: Listener[U], create a Listener[T] that applies f before passing to l...
  • ... and pass that wrapping listener back to Source[T].

And this is exactly what Source.transformValuesWith does.

This looks like a very useful method, so why isn't it a basic concept, with a familiar name (.map)? The devil lies in the details, specifically how Source notifies its Listeners.

Notice that there is no ExecutionContext or Async.Scheduler passed into onComplete. This is because Source often represent values that come from outside the program's control, possibly running on another thread pool. We do not want to introduce a "middleground" where spawning tasks is possible but will implicitly live outside of structured concurrency. With this limitation, we don't have control over how parallel or resource-intensive a Source would take to call a Listener.

In Gears, we attempt to constrain this limitation by asking Listener implementations to be as resource-unintensive as possible. For most cases, Listener involves just scheduling the resumption of the suspended Async context, so it is not a problem.

However, transformValuesWith allows one to easily run unlimited computation within the transform function f. This is dangerous, so transformValuesWith has a long, tedious, greppable name to show its gnar sides.

I want to transform a Source, what do I do?

Explicitly spawn a Future that transforms each item and communicate with it through a Channel:

val ch = SyncChannel[U]()
Future:
  while true do
    val t = source.awaitResult
    ch.send(f(t))

This is not however not always what you want: think about whether you want f and send to be run at the same time, or awaitResult and send, or what to do when f throws, ... Most of the time this is very situation-specific, so Gears do not provide a simple solution.

Locking and Failible Listeners

As mentioned in the previous section, Source.dropListener is not a perfect solution when it comes to managing listeners: dropListener is run possibly at a different thread than the one processing the Source items, which means there is no guarantee that calling dropListener before the listener is resolved will cause the listener to not be resolved.

However, sometimes we do want to achieve this guarantee. To do so, Gears allow a listener to declare itself dead, and Sources calling a dead listener should not be consuming the item to the listener.

How does that work in details, how do we declare a failible listener and how do we handle them?

The Listener trait

Let's look at the Listener trait in detail:

trait Listener[-T]:
  def complete(item: T, origin: Source[T]): Unit
  val lock: ListenerLock | null

trait ListenerLock:
  val selfNumber: Long
  def acquire(): Boolean
  def release(): Unit

We've seen the complete method before in the Source section, which simply forwards the item to the listener, "completing" it.

However, a Listener may also declare itself to have a lock, which must be acquired before calling complete. Most listeners are however not failible and does not require a lock, and so in such case the Listener just declare lock = null.

The listener lock:

  • Returns a boolean in its acquire method: failing to acquire the lock means that the listener is dead, and should not be completed.

    • For performance reasons, they are blocking locks (no using Async in acquire): Sources should be quick in using them!
  • Has a release method: in case that the Source no longer has the item after acquiring the lock, it can be released without completing the Listener. It does not have to be run when the Listener is completed: the Listener will release the lock by itself! (See the diagram below)

  • Exposes a selfNumber, which is required to be unique for each Listener.

    This is used in the case where a Source needs to synchronize between multiple listeners, to prevent deadlocking.

    One can simply use a NumberedLock to implement ListenerLock with the appropriate numbering.

Source and Listener interaction

The diagram for the handshake process between a Source and a Listener is as follow:

Flow for a Source to process a listener

However, if Source's status for the item never changes (item is always available), Listener.completeNow is a shortcut handler for the entire flow, only letting you know whether the listener was alive and received the item.

The handshake process also allows for more complex scenarios, involving synchronization of multiple handlers. For example, the implementation of SyncChannel involves a sending and a receiving listener both being alive to proceed. We shall not go into details in this guide.

Cancellables and Scoping rules

In the section about Structured Concurrency, we learned that Async contexts can manage linked Futures. Let's explore this concept in details in this section.

CompletionGroup

Every Async context created from all sources (Async.blocking, Async.group and Future.apply) creates a brand new CompletionGroup, which is a set of Cancellables. Upon the end of the Async context's body, this CompletionGroup handles calling .cancel on all its Cancellables and wait for all of them to be removed from the group.

Adding a Cancellable to a Group is done by calling add on the Group, or calling link on the Cancellable. Since every Async context contains a CompletionGroup, an overload of link exists that would add the Cancellable to the Async context's group.

Async-scoped values

If you have data that needs to be cleaned up when the Async scope ends, implement Cancellable and link the data to the Async scope:

case class CloseCancellable(c: Closeable) extends Cancellable:
  def cancel() =
    c.close()
    unlink()

extension (closeable: Closeable)
  def link()(using Async) =
    CloseCancellable(closeable).link()

However, note that this would possibly create dangling resources that links to the passed-in Async context of a function:

def f()(using Async) =
  def resource = new Resource()
  resource.link() // links to Async

def main() =
  Async.blocking:
    f()
    // resource is still alive
    g()
    // ...
    0
    // resource is cancelled *here*

Unlinking

Just as Cancellables can be linked to a group, you can also unlink them from their current group. This is one way to create a dangling active Future that is indistinguishable from a passive Future. (Needless to say this is very much not recommended).

To write a function that creates and returns an active Future, write a function that takes an Async.Spawn context:

def returnsDangling()(using Async.Spawn): Future[Int] =
  Future:
    longComputation()

Again, this is a pattern that breaks structured concurrency and should not be used carelessly! One should avoid exposing this pattern of functions in a Gears-using public API of a library.

Structured concurrency patterns

Avoid returning Futures

Async.blocking: when to use them

Staying referentially transparent with Async ?=> T

Taking () => T vs. Async ?=> T

Incremental adoption in your codebase

Asynchronous Operations

Asynchronous Operations

Writing Cross-platform Gears libraries