Search code examples
kotlinkotlinx.coroutines

Consumable channel


Use Case

Android fragment that consumes items of T from a ReceiveChannel<T>. Once consumed, the Ts should be removed from the ReceiveChannel<T>.

I need a ReceiveChannel<T> that supports consuming items from it. It should function as a FIFO queue.

I currently attach to the channel from my UI like:

launch(uiJob) { channel.consumeEach{ /** ... */ } }

I detach by calling uiJob.cancel().

Desired behavior:

val channel = Channel<Int>(UNLIMITED)

channel.send(1)
channel.send(2)
// ui attaches, receives `1` and `2`
channel.send(3) // ui immediately receives `3`
// ui detaches
channel.send(4)
channel.send(5)
// ui attaches, receiving `4` and `5`

Unfortunately, when I detach from the channel, the channel is closed. This causes .send(4) and .send(5) to throw exceptions because the channel is closed. I want to be able to detach from the channel and have it remain usable. How can I do this?

Channel<Int>(UNLIMITED) fits my use case perfect, except that is closes the channel when it is unsubscribed from. I want the channel to remain open. Is this possible?


Solution

  • Channel.consumeEach method calls Channel.consume method which has this line in documentation:

    Makes sure that the given block consumes all elements from the given channel by always invoking cancel after the execution of the block.

    So the solution is to simply not use consume[Each]. For example you can do:

    launch(uiJob) { for (it in channel) { /** ... */ } }