Sources as a Concurrency Primitive
With async functions, active Futures and channels we should be able to write our program, and make them easily reasonable with structured concurrency.
However, often we use concurrency to deal with asynchronous external events, such as file I/O, networking, sockets, ... From the program's point of view, this is completely unstructured: often you ask for an operation, and at some point in the future (out of our control), we get back a response. This does not mix well with our current concepts, where callers always manage its own execution flow and structure.

To deal with such asynchronous events, Gears provide some interfaces to be used as bridges between the unstructured world and
structured concurrency programs.
These come in the forms of Async.Sources.
Callbacks and push
The most basic concept of requesting an asynchronous operation is through a callback.
Let's illustrate this through a very simple example, of reading a file.
object File:
def readString(path: String)(callback: Either[Error, String] => Unit): Cancellable
Suppose we have a non-blocking API of reading a file at path. The content of the file will be read into a String, which
is passed into callback (or, if errors come up, they also get passed into callback).
Rather than the operation result, readString's caller receives a request object, which has a cancel method to attempt
to cancel the read operation.

The flow of callback-based operations is as follows:
-
bodycallsFile.readString(path)(callback), getting back aCancellableimmediately. -
bodydoes not control when or howcallbackis called, and has no provided way to interact withcallback.In reality, should
bodywant to wait forcallback's result, it can either- Put all handling logic past
readStringintocallback, turningcallbackinto a full continuation. - Introduce blocking/suspending through some synchronization primitive (channel, mutex).
Without
Asyncthe latter option introduces blocking code, which effectively nullifies the advantage of using non-blocking API in the first place. Therefore, traditional languages with callbacks (Node.js is an infamous example) chose the former, creating the problem of callback hell. - Put all handling logic past
-
At some time in the future,
callbackis called with the result. This might happen whilebodyis still executing, or after. Depending on parallelism capabilities, it may be executed in parallel tobody.
How Async makes callbacks easier
In Gears however, we have Async computations: computations that can suspend to wait for a future value! This means,
given the same API, we can do the following:
- Suspend the current
Asynccontext - Issue the asynchronous request, passing the resumption of the
Asynccontext as the callback.

And, in essence, this is exactly what Async.await does!
Let's have a now fully-expanded signature of Async.await:
trait Async:
def await[T](src: Source[T]): T
With our knowledge of the Async-callback flow, we know that src would be some callback-taking future value.
Async.await would then do the suspend / set-resume-callback procedure, and return the output of the src as if we continued
from within the callback.
This suspension/resume mechanism is how Async was able to let you await for concurrent Futures (which is also a Source!)
without being inside a Future itself, monadic style.
The Source trait
Now that we have an idea of what a Source is, we can now look at how
Source is defined:
trait Source[+T]:
def onComplete(listener: Listener[T]): Unit
def poll(listener: Listener[T]): Boolean
def poll(): Option[T]
def dropListener(listener: Listener[T]): Unit
inline def awaitResult(using Async): T = Async.await(this)
// Included for convenience, simplified...
trait Listener[-T]:
def complete(item: T, origin: Source[T]): Unit
-
onCompletedoes exactly what you expect: when an item is available,listeneris called with the item.listeneralso gets a reference to theSourceitself, in case it was passed to multipleSources and needs to distinguish which source the item came from. -
poll(listener)works as a non-blocking poll: if an item is immediately available, call thelistenerwith it immediately. Otherwise, immediately returnfalse, and usually this means you'd want to put your listener intoonCompleteinstead.If an item is all you need,
poll()is a much more convenient overload: you just get the item back, orNoneif it was not immediately available. -
dropListenersignals thatlisteneris no longer needed to be called. This usually signals the cancellation of the value receiver.Note that this is not cancelling the
Sourceitself:Sourcedoes not have a concept of cancellation! Not all asynchronous operations are cancellable.
Finally, awaitResult is just a convenience postfix method for Async.await.
In special cases where T is Either or Try (Future[T] is a Source[Try[T]]), we have .await as a convenience
postfix method that also unwraps the value!
Persistent and Ephemeral Sources
So far, things seem clear when we have one Source and one Listener.
What happens when we pass multiple listeners into a Source?
The answer depends a lot on what the source is representing:
-
Should we write the above
readStringfunction in terms ofSourceinstead:object File: def readString(path: String): Source[Either[Error, String]] & Cancellablethen the
Sourcereturned represents a single asynchronous request. This request returns once, so no matter how many listeners are attached to it, they should get the same value every time.We call Sources where all listeners get the same value, resolved at the same time1, a persistent source. Especially, if a source is both persistent and cancellable, it is a
Future. We shall see the "real" interface of aFuturein the next section.// "correct" definition object File: def readString(path: String): Future[Either[Error, String]] -
Suppose we want to capture signals sent to our program. We might use a signal handler like this
object Signals: def capture(signals: Signal*): Source[Signal]in which our
Sourcerepresents a subscription to the system process' signal handler. This will deliver multiple signals to our program, and each listener attached to thisSourceshould get one, possibly different signal. We call Sources that deliver possibly different values to listeners an ephemeral source.Another example of an ephemeral source is
.readSourceof a channel.
Active and Reactive Sources
Activeness and Reactivity of Sources refer to whether values are resolved to the Source regardless of interactions with it (regarding adding/removing listeners).
Some sources, like an active Future or an asynchronous request like File.readString, will complete and
resolve with a value regardless of a listener attached to it or not.
Some other sources, like Channel.readSource, does not do anything on creation. It only attempts to
consume a value from the channel when a listener is attached to it.
From the perspective of the Source, this matters when one decides to create a Source but not await for it.
There is no way to differentiate the two sources from the signature alone, hence this should be an anti-pattern.
it is hard to say that concurrent listeners are resolved "at the same time".
We usually reason about this in terms of subsequent listeners: if one listener is attached to a persistent Source
after another has been resolved, the second listener should be resolved immediately with the same value.
Consequently, polling this Source will always give the same value back immediately.