Select and Race
Async.select works on both Channels and Futures because they operate on Sources.
Its behavior generalizes to taking all Sources, with exactly one item consumed from one of the sources,
of which the corresponding handler is run.
Async.select machinery
This behavior is achieved by passing into the Sources a special Listener that would only accept the
first item resolved, and rejecting all others.
Learn more about failible in the next section.
Async.race
Under the hood, Async.select uses a more general method to combine multiple Sources into one
with a racing behavior.
object Async:
  def race[T](sources: Source[T]*): Source[T]
race takes multiple sources and returns a new ephemeral, reactive Source that:
- once a listener is attached, it wraps the listener with a one-time check and forwards it to all sources.
- when one source resolves this listener with a value, invalidates this listener on all other sources.
Note that Source.dropListener is inherently racy: one cannot reliably use dropListener to guarantee
that a listener will not be run.
This is the main motivation to introduce failible listeners.
Async.raceWithOrigin is
a variant of race that also returns which of the origin sources the item comes from.
Calling listeners and Source.transformValuesWith
From the description of race, you might notice a general pattern for transforming Sources with a .map-like
behavior:
- Given a transformation function f: T => U, one can transform aSource[T]to aSource[U]by creating a Source that...
- Given a listener l: Listener[U], create aListener[T]that appliesfbefore passing tol...
- ... and pass that wrapping listener back to Source[T].
And this is exactly what
Source.transformValuesWith does.
This looks like a very useful method, so why isn't it a basic concept, with a familiar name (.map)?
The devil lies in the details, specifically how Source notifies its Listeners.
Notice that there is no ExecutionContext or Async.Scheduler passed into onComplete.
This is because Source often represent values that come from outside the program's control, possibly running
on another thread pool. We do not want to introduce a "middleground" where spawning tasks is possible but will
implicitly live outside of structured concurrency.
With this limitation, we don't have control over how parallel or resource-intensive a Source would take
to call a Listener.
In Gears, we attempt to constrain this limitation by asking Listener implementations to be as resource-unintensive as possible.
For most cases, Listener involves just scheduling the resumption of the suspended Async context, so it is not a problem.
However, transformValuesWith allows one to easily run unlimited computation within the transform function f.
This is dangerous, so transformValuesWith has a long, tedious, greppable name to show its gnar sides.
I want to transform a Source, what do I do?
Explicitly spawn a Future that transforms each item and communicate with it through a Channel:
val ch = SyncChannel[U]()
Future:
  while true do
    val t = source.awaitResult
    ch.send(f(t))
This is not however not always what you want: think about whether you want f and send to be run at the same time,
or awaitResult and send, or what to do when f throws, ...
Most of the time this is very situation-specific, so Gears do not provide a simple solution.