Search code examples
kotlinkotlin-coroutineskotlinx.coroutines.channels

How to close the channel after all producer coroutines are done?


Consider the following code:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        for (x in channel) {
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

The original version has the receiver coroutine like that:

launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }

It expects only 3 messages in the channel. If I change it to the first version then I need to close the channel after all producer coroutines are done. How can I do that?


Solution

  • A possible solution is to create a job that will wait for all channel.send() to finish, and call channel.close() in the invokeOnCompletion of this job:

    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.*
    
    fun main() = runBlocking<Unit> {
        val channel = Channel<String>()
        launch {
          launch {
              channel.send("A1")
              channel.send("A2")
              log("A done")
          }
          launch {
              channel.send("B1")
              log("B done")
          }
        }.invokeOnCompletion {
            channel.close()
        }
        launch {
            for (x in channel) {
                log(x)
            }
        }
    }
    
    fun log(message: Any?) {
        println("[${Thread.currentThread().name}] $message")
    }