Search code examples
kotlinasynchronousconcurrencyparallel-processingakka

Kotlin: Is there a tool that allows me to control parallelism when executing suspend functions?


I'm trying to execute certain suspend function multiple times, in such a way that never more than N of these are being executed at the same time.

For those acquainted with Akka and Scala Streaming libraries, something like mapAsync.

I did my own implementation using one input channel (as in kotlin channels) and N output channels. But it seems cumbersome and not very efficient.

The code I'm currently using is somewhat like this:

val inChannel = Channel<T>()
val outChannels = (0..n).map{
  Channel<T>()
}
launch{
   var i = 0
   for(t in inChannel){
     
     outChannels[i].offer(t)
     i = ((i+1)%n)
   }
}
outChannels.forEach{outChannel ->
  launch{
     for(t in outChannel){
        fn(t)
     }
  }
}

Of course it has error management and everything, but still...

Edit: I did the following test, and it failed.

test("Parallelism is correctly capped") {
            val scope = CoroutineScope(Dispatchers.Default.limitedParallelism(3))
            var num = 0
            (1..100).map {
                scope.launch {
                    num ++
                    println("started $it")
                    delay(Long.MAX_VALUE)
                }
            }

            delay(500)
            assertEquals(3,num)

        }

Solution

  • Your question, as asked, calls for @marstran's answer. If what you want is that no more than N coroutines are being actively executed at any given time (in parallel), then limitedParallelism is the way to go:

    val maxThreads: Int = TODO("some max number of threads")
    val limitedDispatcher = Dispatchers.Default.limitedParallelism(maxThreads)
    
    elements.forEach { elt ->
        scope.launch(limitedDispatcher) {
            doSomething(elt)
        }
    }
    

    Now, if what you want is to even limit concurrency, so that at most N coroutines are run concurrently (potentially interlacing), regardless of threads, you could use a Semaphore instead:

    val maxConcurrency: Int = TODO("some max number of concurrency coroutines")
    val semaphore = Semaphore(maxConcurrency)
    
    elements.forEach { elt ->
        scope.async {
            semaphore.withPermit {
                doSomething(elt)
            }
        }
    }
    

    You can also combine both approaches.