Locking and Failible Listeners
As mentioned in the previous section, Source.dropListener
is not a perfect solution when it comes to managing listeners:
dropListener is run possibly at a different thread than the one processing the Source items,
which means there is no guarantee that calling dropListener before the listener is resolved
will cause the listener to not be resolved.
However, sometimes we do want to achieve this guarantee.
To do so, Gears allow a listener to declare itself dead, and Sources calling a dead listener
should not be consuming the item to the listener.
How does that work in details, how do we declare a failible listener and how do we handle them?
The Listener trait
Let's look at the Listener trait in detail:
trait Listener[-T]:
def complete(item: T, origin: Source[T]): Unit
val lock: ListenerLock | null
trait ListenerLock:
val selfNumber: Long
def acquire(): Boolean
def release(): Unit
We've seen the complete method before in the Source section, which simply
forwards the item to the listener, "completing" it.
However, a Listener may also declare itself to have a lock,
which must be acquired before calling complete.
Most listeners are however not failible and does not require a lock, and so in such case the Listener
just declare lock = null.
The listener lock:
-
Returns a boolean in its
acquiremethod: failing to acquire the lock means that the listener is dead, and should not be completed.- For performance reasons, they are blocking locks (no
using Asyncinacquire):Sources should be quick in using them!
- For performance reasons, they are blocking locks (no
-
Has a
releasemethod: in case that theSourceno longer has the item after acquiring the lock, it can bereleasedwithout completing theListener. It does not have to be run when theListeneriscompleted: theListenerwill release the lock by itself! (See the diagram below) -
Exposes a
selfNumber, which is required to be unique for eachListener.This is used in the case where a
Sourceneeds to synchronize between multiple listeners, to prevent deadlocking.One can simply use a
NumberedLockto implementListenerLockwith the appropriate numbering.
Source and Listener interaction
The diagram for the handshake process between a Source and a Listener is as follow:

However, if Source's status for the item never changes (item is always available),
Listener.completeNow
is a shortcut handler for the entire flow, only letting you know whether the listener was alive and
received the item.
The handshake process also allows for more complex scenarios, involving synchronization of multiple handlers.
For example,
the implementation of SyncChannel
involves a sending and a receiving listener both being alive to proceed.
We shall not go into details in this guide.