Search code examples
kotlinconcurrencykotlin-coroutinescoroutine

Kotlin co-routine executes sequentially, but only on production machine


I have a bunch of network requests I want to conduct in parallel.

The following pseudo code should give a good idea of what I'm doing right now:

runBlocking {
    buildList {
        withContext(tracer.asContextElement()) {
            items.forEach { item ->
                add(
                    async {
                        // a few IO intensive operations (i.e. network requests)
                    }
                )
            }
        }
    }.awaitAll()
}

I have tracing tools set up and locally this seems to do the job. In my production infrastructure however the async tasks execute sequentially, i.e. the second one starts immediately after the first one finishes.

I have also tried using withContext(Dispatchers.IO.plus(tracer.asContextElement())) but I observe no difference.

The only thing I can say is that my development machine has multiple CPU cores, and my production machine will normally have 1. Regardless, due to the IO heavy nature of these processes I doubt this is the problem. I can't really explain what is causing this, but my gut feeling is that I'm fundamentally not understanding something about how Coroutines work in Kotlin.

As to the nature of the network request in question, I'm using a third party SDK that asynchronously executes the request, and seems to use ForkJoinPool.commonPool() under the hood as an executor.


Solution

  • If you don't switch dispatchers here, all those coroutines will run in the same thread - the one blocked by runBlocking. If the computation inside each coroutine is blocking, they will block the only thread one by one without any way to parallelize. This would explain what you're seeing (although it's strange that you don't reproduce locally).

    I have also tried using withContext(Dispatchers.IO.plus(tracer.asContextElement())) but I observe no difference.

    Your fix should work, unless the IO you're performing is actually managing threads itself and also confining the execution to a single thread no matter where it's called from. Maybe you should look into the actual IO then.

    EDIT: you mentioned that you perform the IO operations via a third party SDK that uses the common ForkJoinPool - this one is backed by a single thread on a single-CPU machine, so this explains why the calls aren't parallelized in your single-CPU production machine. The only options to fix that would be:

    1. check whether the SDK you're using allows to customize the backing pool of threads
    2. customize the size of the ForkJoinPool using the JVM property java.util.concurrent.ForkJoinPool.common.parallelism
    3. use another SDK :)

    You still need to customize the dispatcher in addition to that if you're calling the library in a blocking way, but not if you're converting their async tasks into suspensions using Future.await() or similar.


    Now, a few other things to note in this code:

    • you don't need buildList { .. }, you can just use map { thing } instead of forEach { add(thing) } and you'll get the resulting list as a return value (it also works across withContext, because it returns the lambda result)

    • withContext actually waits for all child coroutines to finish, so awaitAll() is misplaced here (it should rather be inside withContext)

    • actually, you probably don't need withContext at all, you can pass the custom context directly to runBlocking, unless you have other things in runBlocking that you don't want to run in this context

    • (optional) if the IO computations don't return results, you don't need awaitAll at all, and you could just use launch instead.

    Assuming you do need the result, so ignoring the last point, your current code (with dispatcher fix) could be rewritten to:

    val results = runBlocking(Dispatchers.IO + tracer.asContextElement()) {
        items.map { item ->
            async {
                performIO(item)
            }
        }.awaitAll()
    }
    

    Otherwise:

    runBlocking(Dispatchers.IO + tracer.asContextElement()) {
        items.map { item ->
            launch {
                performIO(item)
            }
        }
    }