Search code examples
kotlincoroutinekotlinx.coroutinesbackpressure

Kotlin coroutines: concurrent execution throttling


Imagine we are reading messages from a message queue and on receive pushing them for processing into a threadpool. There are limited number of threads, so if all the threads are busy, we'll have natural backpressure.

How this can be solved in Kotlin coroutine world? If we'll create a coroutine for each incoming message, we can very fast end up with Out of memory errors (e.g if each task requires to load some data from DB) and other issues.

Are there any mechanisms or patterns to solve this problem?


Solution

  • One way to solve the issue, is to create a Channel and send your data onto it. Other threads can put a consumeEach on the channel to receive data from it. The channel's capacity can be tweaked to your threading needs.

    Fan-out and Fan-in examples in the coroutines docs can be helpful too.