Locking and Failible Listeners

As mentioned in the previous section, Source.dropListener is not a perfect solution when it comes to managing listeners: dropListener is run possibly at a different thread than the one processing the Source items, which means there is no guarantee that calling dropListener before the listener is resolved will cause the listener to not be resolved.

However, sometimes we do want to achieve this guarantee. To do so, Gears allow a listener to declare itself dead, and Sources calling a dead listener should not be consuming the item to the listener.

How does that work in details, how do we declare a failible listener and how do we handle them?

The Listener trait

Let's look at the Listener trait in detail:

trait Listener[-T]:
  def complete(item: T, origin: Source[T]): Unit
  val lock: ListenerLock | null

trait ListenerLock:
  val selfNumber: Long
  def acquire(): Boolean
  def release(): Unit

We've seen the complete method before in the Source section, which simply forwards the item to the listener, "completing" it.

However, a Listener may also declare itself to have a lock, which must be acquired before calling complete. Most listeners are however not failible and does not require a lock, and so in such case the Listener just declare lock = null.

The listener lock:

  • Returns a boolean in its acquire method: failing to acquire the lock means that the listener is dead, and should not be completed.

    • For performance reasons, they are blocking locks (no using Async in acquire): Sources should be quick in using them!
  • Has a release method: in case that the Source no longer has the item after acquiring the lock, it can be released without completing the Listener. It does not have to be run when the Listener is completed: the Listener will release the lock by itself! (See the diagram below)

  • Exposes a selfNumber, which is required to be unique for each Listener.

    This is used in the case where a Source needs to synchronize between multiple listeners, to prevent deadlocking.

    One can simply use a NumberedLock to implement ListenerLock with the appropriate numbering.

Source and Listener interaction

The diagram for the handshake process between a Source and a Listener is as follow:

Flow for a Source to process a listener

However, if Source's status for the item never changes (item is always available), Listener.completeNow is a shortcut handler for the entire flow, only letting you know whether the listener was alive and received the item.

The handshake process also allows for more complex scenarios, involving synchronization of multiple handlers. For example, the implementation of SyncChannel involves a sending and a receiving listener both being alive to proceed. We shall not go into details in this guide.