Inter-future communication with Channels
For immutable data, it is easy and recommended to directly
capture references to them from within Future
s. However, to share mutable data
or to communicate between concurrent computations, Gears recommend following
Effective Go's slogan on sharing:
Do not communicate by sharing memory; instead, share memory by communicating.
To facillitate this, Gears provides channels, a simple mean of pipe-style communication between computations.
Channels
Channel
can be thought
of as a pipe between senders and readers, with or without a buffer in between. The latter
differentiates between the types of channels provided by Gears.
Nevertheless, all channels provide the following methods:
-
send
takes a valueT
and attempt to send it through the channel. Suspends until the value has been received or stored in the channel's buffer (if one exists).If the channel is closed,
send
will throwChannelClosedException
. -
read
suspends until a value is available through the channel, consuming it from the channel.read
returnsEither[Closed, T]
, withLeft(Closed)
returned in the obvious case. -
close
simply closes the channel.
Channels provide the following guarantees:
- Two sequential
send
s (onesend
followed by anothersend
, not done concurrently) will beread
in the same order. - An item that is
send
is alwaysread
exactly once, unless if it was in the channel's buffer when the channel is closed.
Types of channels
Gears provide 3 types of channels:
-
SyncChannel
are channels without a buffer, so everysend
will suspend until a correspondingread
succeeds.Since there are no buffers, an item that is
send
is alwaysread
exactly once. -
BufferedChannel
are channels with a buffer of a fixed size.send
s will succeed immediately if there is space in the buffer, and suspend otherwise, until there is space for it.When
cancel
led, items in the buffer are dropped. -
UnboundedChannel
are channels with a buffer that is infinitely growable.send
s always succeed immediately. In fact,UnboundedChannel
providessendImmediately
that never suspends.When
cancel
led, items in the buffer are dropped.
Restricted channel interfaces
When writing functions taking in channels, it is often useful to restrict them to be read-only or send-only.
Gears Channel
trait is composed of
ReadableChannel
,
SendableChannel
and
Closeable
, so any channel can be upcasted to one of the above three interfaces.
Async.select
with channels
Async.select
can be used with channels. To do so, use
.readSource
for reading and
.sendSource(T)
for sending; before
attaching .handle
and providing a handler.
Async.blocking:
val fut = Future:
AsyncOperations.sleep(1.minute)
10
val ch = SyncChannel[Int]()
val readFut = Future:
ch.read().right.get + 1
Async.select(
fut.handle: x => println(s"fut returned $x"),
ch.sendSource(20).handle:
case Left(Closed) => throw Exception("channel closed!?")
case Right(()) =>
println(s"sent from channel!")
println(s"readFut returned ${readFut.await}")
)
Similar to future handlers, Async.select
guarantees that exactly one of the handlers are run.
Not only so, it also guarantees that only the channel event with the handler that is run will go through!
For example, in the following snippet, it guarantees that exactly one channel is consumed from for each loop:
val chans = (1 to 2).map: i =>
val chan = UnboundedChannel()
chan.sendImmediately(i)
chan
for i <- 1 to 2 do
val read = Async.select(
chans(0).readSource.handle: v => v,
chans(1).readSource.handle: v => v,
)
println(read) // prints 1 2 or 2 1
It is possible to mix reads and sends within one Async.select
, or mix channel operations with futures!
Channel in Use: Actor pattern
One of the common patterns of using channels is to write functions that continuously take inputs from one channel, process them, and send their output into another channel. This is sometimes referred to as an Actor, or a Communicating Sequential Process.
Let's look at one of the examples, inspired by Rob Pike's talk on concurrency patterns in Go. This is an implementation of a concurrent prime sieve.
//> using scala 3.4.0
//> using dep "ch.epfl.lamp::gears::0.2.0"
//> using nativeVersion "0.5.1"
//> using nativeMode "release-full"
import java.io.Closeable
import scala.annotation.tailrec
import scala.util.boundary
import gears.async.*
import gears.async.default.given
// Sender that owns the channel
type OwnedSender[T] = SendableChannel[T] & Closeable
// Send the sequence 2, 3, 4, ..., until and then close the channel
def generate(until: Int)(ch: OwnedSender[Int])(using Async) =
for i <- 2 to until do ch.send(i)
ch.close()
// Filter out multiples of k
def sieve(
k: Int
)(in: ReadableChannel[Int], out: OwnedSender[Int])(using Async) =
@tailrec def loop(): Unit =
in.read() match
case Left(_) => ()
case Right(n) =>
if n % k != 0 then out.send(n)
loop()
loop()
out.close()
@main def PrimeSieve(n: Int) =
Async.blocking:
// set up sieves
val inputChan = SyncChannel[Int]()
// start generating numbers
Future(generate(n)(inputChan))
// Collect answers
@tailrec def loop(input: ReadableChannel[Int]): Unit =
input.read() match
// no more numbers
case Left(_) => ()
// new prime number
case Right(n) =>
println(s"$n is prime")
// filter out multiples of n
val chan = SyncChannel[Int]()
Future(sieve(n)(input, chan))
loop(chan)
loop(inputChan)
Some notable points:
generate
andsieve
are bothusing Async
functions, they block the caller until their work has finished. We want to run them concurrently as actors, we spawn them withFuture.apply
.- Both are given channels to write into, and the permission to close them. This is a common pattern, useful when your channels are single sender, multiple readers1. In such case, it is common for the sender to signal that there are no more inputs by simply closing the channel. All readers are then immediately notified, and in this case (where readers are senders themselves in a pipeline), they can forward this signal by closing their respective channels.
in the case where multiple senders are involved, the logic is much more complicated. You might want
to look into ChannelMultiplexer
.