Search code examples
androidkotlinkotlin-coroutineskotlin-flow

How can I collect data from kotlin flow until it reaches a specific size or a specific time passed, whichever condition is met first?


I want to call an api in android app after collect data from kotlin flow until it meets any of these conditions:

  1. 5 values have been collected
  2. 3 seconds have passed

Let's say I have emit flow of int

emit 1
emit 2
emit 3
emit 4
emit 5
emit 6
emit 7
emit 8
delay 4000 millis
emit 9
emit 10
emit 11
emit 12
emit 13

The expected result is:

call an api with value [1,2,3,4,5]....
call an api with value [6,7,8]....
call an api with value [9,10,11,12,13]....

And this continue collect forever until activity is destroyed


Solution

  • I don't think there is a way to achieve this with the built-in flow operations.

    You can use a custom collect function like this, though:

    @OptIn(DelicateCoroutinesApi::class)
    suspend fun <T> Flow<T>.collectAggregatedWithTimeout(
        count: Int,
        timeout: Duration,
        block: suspend (List<T>) -> Unit,
    ) {
        val channel = Channel<T>(count)
    
        coroutineScope {
            launch {
                collect { channel.send(it) }
                channel.close()
            }
    
            while (!channel.isClosedForReceive) {
                val list = mutableListOf<T>()
    
                withTimeoutOrNull(timeout) {
                    repeat(count) {
                        channel.receiveCatching().getOrNull()?.let {
                            list.add(it)
                        } ?: return@withTimeoutOrNull
                    }
                }
    
                block(list)
            }
        }
    }
    

    The key is to let the regular collector run in the background and fill a channel with the collected values while the channel's values are read in another coroutine and the block is called every count elements or after the timeout duration elapsed. You can call it like this:

    flow.collectAggregatedWithTimeout(5, 3.seconds) {
        api.call(it)
    }