I want to call an api in android app after collect data from kotlin flow until it meets any of these conditions:
Let's say I have emit flow of int
emit 1
emit 2
emit 3
emit 4
emit 5
emit 6
emit 7
emit 8
delay 4000 millis
emit 9
emit 10
emit 11
emit 12
emit 13
The expected result is:
call an api with value [1,2,3,4,5]....
call an api with value [6,7,8]....
call an api with value [9,10,11,12,13]....
And this continue collect forever until activity is destroyed
I don't think there is a way to achieve this with the built-in flow operations.
You can use a custom collect function like this, though:
@OptIn(DelicateCoroutinesApi::class)
suspend fun <T> Flow<T>.collectAggregatedWithTimeout(
count: Int,
timeout: Duration,
block: suspend (List<T>) -> Unit,
) {
val channel = Channel<T>(count)
coroutineScope {
launch {
collect { channel.send(it) }
channel.close()
}
while (!channel.isClosedForReceive) {
val list = mutableListOf<T>()
withTimeoutOrNull(timeout) {
repeat(count) {
channel.receiveCatching().getOrNull()?.let {
list.add(it)
} ?: return@withTimeoutOrNull
}
}
block(list)
}
}
}
The key is to let the regular collector run in the background and fill a channel with the collected values while the channel's values are read in another coroutine and the block
is called every count
elements or after the timeout
duration elapsed. You can call it like this:
flow.collectAggregatedWithTimeout(5, 3.seconds) {
api.call(it)
}