Search code examples
kotlinkotlinx.coroutines

Kotlin wait for Channel.isClosedForReceive


After calling Channel.close() what is the best way to wait for Channel.isClosedForReceive to be true?

I'm processing messages in order and would like to return the max message processed after calling Channel.close(). However, if I just take the max processed message after calling close() there may be some messages in the channel before the "close token" is consumed causing the real max processed message to be greater then the value returned.

Based on the doc for Channel.close() I think Channel.isClosedForReceive is what I should be waiting for. But I was expecting some suspend function to wait, rather then having to poll its status.

/**
 * Immediately after invocation of this function
 * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
 * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
 * are received.
 **/

Solution

  • Let's assume that you process all your messages in some kind of an actor:

    val channel = actor<Message> { ... }
    

    The best way is to learn the last message that this actor processed is to create an explicit back channel to communicate this information. Since we need to communicate only one value just once we'd use CompletableDeferred this purpose:

    val lastProcessed = CompletableDeferred<Message?>() 
    

    Now, you can write your message-processing actor in the following way:

    val actor = actor<Message> {
        var last: Message? = null
        try {
            for (msg in channel) {
                // process message
                last = msg
            }
        } finally {
            // report the last processed message via back channel
            lastProcessed.complete(last)
        }
    }
    

    Here is the code that closes the channel and learns the last processed message:

    actor.close()
    val last = lastProcessed.await() // receive last processed message