Inter-future communication with Channels
For immutable data, it is easy and recommended to directly
capture references to them from within Futures. However, to share mutable data
or to communicate between concurrent computations, Gears recommend following
Effective Go's slogan on sharing:
Do not communicate by sharing memory; instead, share memory by communicating.
To facillitate this, Gears provides channels, a simple mean of pipe-style communication between computations.
Channels
Channel can be thought
of as a pipe between senders and readers, with or without a buffer in between. The latter
differentiates between the types of channels provided by Gears.
Nevertheless, all channels provide the following methods:
-
sendtakes a valueTand attempt to send it through the channel. Suspends until the value has been received or stored in the channel's buffer (if one exists).If the channel is closed,
sendwill throwChannelClosedException. -
readsuspends until a value is available through the channel, consuming it from the channel.readreturnsEither[Closed, T], withLeft(Closed)returned in the obvious case. -
closesimply closes the channel.
Channels provide the following guarantees:
- Two sequential
sends (onesendfollowed by anothersend, not done concurrently) will bereadin the same order. - An item that is
sendis alwaysreadexactly once, unless if it was in the channel's buffer when the channel is closed.
Types of channels
Gears provide 3 types of channels:
-
SyncChannelare channels without a buffer, so everysendwill suspend until a correspondingreadsucceeds.Since there are no buffers, an item that is
sendis alwaysreadexactly once. -
BufferedChannelare channels with a buffer of a fixed size.sends will succeed immediately if there is space in the buffer, and suspend otherwise, until there is space for it.When
cancelled, items in the buffer are dropped. -
UnboundedChannelare channels with a buffer that is infinitely growable.sends always succeed immediately. In fact,UnboundedChannelprovidessendImmediatelythat never suspends.When
cancelled, items in the buffer are dropped.
Restricted channel interfaces
When writing functions taking in channels, it is often useful to restrict them to be read-only or send-only.
Gears Channel trait is composed of
ReadableChannel,
SendableChannel and
Closeable, so any channel can be upcasted to one of the above three interfaces.
Async.select with channels
Async.select can be used with channels. To do so, use
.readSource for reading and
.sendSource(T) for sending; before
attaching .handle and providing a handler.
Async.blocking:
val fut = Future:
AsyncOperations.sleep(1.minute)
10
val ch = SyncChannel[Int]()
val readFut = Future:
ch.read().right.get + 1
Async.select(
fut.handle: x => println(s"fut returned $x"),
ch.sendSource(20).handle:
case Left(Closed) => throw Exception("channel closed!?")
case Right(()) =>
println(s"sent from channel!")
println(s"readFut returned ${readFut.await}")
)
Similar to future handlers, Async.select guarantees that exactly one of the handlers are run.
Not only so, it also guarantees that only the channel event with the handler that is run will go through!
For example, in the following snippet, it guarantees that exactly one channel is consumed from for each loop:
val chans = (1 to 2).map: i =>
val chan = UnboundedChannel()
chan.sendImmediately(i)
chan
for i <- 1 to 2 do
val read = Async.select(
chans(0).readSource.handle: v => v,
chans(1).readSource.handle: v => v,
)
println(read) // prints 1 2 or 2 1
It is possible to mix reads and sends within one Async.select, or mix channel operations with futures!
Channel in Use: Actor pattern
One of the common patterns of using channels is to write functions that continuously take inputs from one channel, process them, and send their output into another channel. This is sometimes referred to as an Actor, or a Communicating Sequential Process.
Let's look at one of the examples, inspired by Rob Pike's talk on concurrency patterns in Go. This is an implementation of a concurrent prime sieve.
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
//> using nativeMode "release-full"
import java.io.Closeable
import scala.annotation.tailrec
import scala.util.boundary
import gears.async.*
import gears.async.default.given
// Sender that owns the channel
type OwnedSender[T] = SendableChannel[T] & Closeable
// Send the sequence 2, 3, 4, ..., until and then close the channel
def generate(until: Int)(ch: OwnedSender[Int])(using Async) =
for i <- 2 to until do ch.send(i)
ch.close()
// Filter out multiples of k
def sieve(
k: Int
)(in: ReadableChannel[Int], out: OwnedSender[Int])(using Async) =
@tailrec def loop(): Unit =
in.read() match
case Left(_) => ()
case Right(n) =>
if n % k != 0 then out.send(n)
loop()
loop()
out.close()
@main def PrimeSieve(n: Int) =
Async.blocking:
// set up sieves
val inputChan = SyncChannel[Int]()
// start generating numbers
Future(generate(n)(inputChan))
// Collect answers
@tailrec def loop(input: ReadableChannel[Int]): Unit =
input.read() match
// no more numbers
case Left(_) => ()
// new prime number
case Right(n) =>
println(s"$n is prime")
// filter out multiples of n
val chan = SyncChannel[Int]()
Future(sieve(n)(input, chan))
loop(chan)
loop(inputChan)
Some notable points:
generateandsieveare bothusing Asyncfunctions, they block the caller until their work has finished. We want to run them concurrently as actors, we spawn them withFuture.apply.- Both are given channels to write into, and the permission to close them. This is a common pattern, useful when your channels are single sender, multiple readers1. In such case, it is common for the sender to signal that there are no more inputs by simply closing the channel. All readers are then immediately notified, and in this case (where readers are senders themselves in a pipeline), they can forward this signal by closing their respective channels.
in the case where multiple senders are involved, the logic is much more complicated. You might want
to look into ChannelMultiplexer.