Search code examples
kotlinopenshiftkotlin-coroutinesktor

Run several coroutines in a container deployed in OpenShift


I have the following Kotlin code deployed as a container in OpenShift:

fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)

@kotlin.jvm.JvmOverloads
fun Application.module() {

    launch { consumeProductionGeneratingUnits1hTopic() }
    launch { consumeProductionLargeGeneratingUnits1hTopic() }
    launch { consumeProductionAggregateProdType1hTopic() }

}

Each of the coroutines is simply consuming from a kafka topic in an infinite loop:

fun runCoroutine() {
    val consumer = buildConsumer("topic")
    while (true){
        val record = consumer.poll(Duration.ofSeconds(30))
        println(record.toString())
    }
}

When I run this code locally, all three coroutines are started. However, when I deploy and run the code as a container in OpenShift, only the first two coroutines are started. It looks like OpenShift supports a maximum of two coroutines.

Has anyone experienced something similar? I have tried to reserve more cpu for the running pod, but it does not affect how the coroutines behave.


Solution

  • First thing to note is that you're not using suspending functions, so these coroutines will just block the thread they are run on forever with their while(true) loop. Coroutines are designed to be cooperative, so you need suspension points to allow the threads to switch.

    With the current implementation, if you're dispatching on a thread pool with only 2 threads, the first 2 coroutines will block them, and the third coroutine will never run. Some coroutine dispatchers use a number of threads that depend on the number of available cores, which would explain the difference in behaviour between your local machine (likely more than 2 cores) and the containers (likely 2 cores).

    I cannot tell whether you're dispatching these coroutines on a thread pool with more than 2 threads, because you didn't show the coroutine scope on which you launch them (your code as-is shouldn't compile, unless you are using a very old version of coroutines with top-level launch without CoroutineScope receiver?).

    Solutions

    Of course you could allocate more cores to your pod, but that's just pushing the problem.

    Another option is to use a thread pool with more threads, but that's also just pushing the problem.

    A correct fix IMO would be to actually use async APIs converted to suspend functions. But an easier (quick) fix is to keep your code as-is, but just add a call to yield() in the loop to make sure the thread is freed from time to time for other coroutines:

    suspend fun runCoroutine() {
        val consumer = buildConsumer("topic")
        while (true){
            val record = consumer.poll(Duration.ofSeconds(30))
            println(record.toString())
            yield() // ensures we suspend to free the thread
        }
    }