Search code examples
kotlinkotlinx.coroutines

What is the way to push data from a callback to a Kotlin coroutine


Suppose I want to use a cache in a DownloadQueue like example. The joy of coroutines there is that a simple data structure (e.g. HashMap) can be used in a single threaded algorithm.

However I want to use a limited cache, and evict extra entries to a slower persistent storage. For instance, I can take Guava + CacheBuilder + RemovalListener: https://github.com/google/guava/wiki/CachesExplained#removal-listeners

Here's a problem: RemovalListener is a non-suspended callback, so there's no way to push data to Channels or whatever from the callback.

// This is placed in a suspend context (e.g. within `actor`)
val cache = CacheBuilder.newBuilder().maximumSize(1000)
    .expireAfterWrite(10, TimeUnit.SECONDS)
    .removalListener<Key, Value> { n ->
        expireChannel.sendBlocking(n.value!!) // <-- could sendBlocking be used here???
    }.build<Key, Value>()

As far as I understand, runBlocking blocks the execution of a current thread, thus it blocks current coroutine, and it won't be able to receive/handle the data.

Is there a way to make RemovalListener-like callbacks coroutine-friendly?

I can imagine a workaround of using an unbounded queue that is populated by a RemovalListener while the coroutine in question eventually checks the queue.


Solution

  • The only way to not block the removal listener (and thus break the cache) is to have a buffered channel. This means two changes:

    1. Create the SendChannel with a non-zero capacity, see Channel(capacity) to consider the different behaviors. It mostly depends on what the behavior should be if more items expire than the consumer can handle.
    2. Use SendChannel.offer() instead of SendChannel.sendBlocking(). If it returns false you can implement backup behavior like logging the failure so you can figure out what the best balance between memory and losing events is.