I have a flow of key-value pairs (e.g. Flow<Pair<String, Int>>
) which I would like to debounce/throttle based on the key, so that when some sequence of pairs with non-unique keys comes I would only emit further the last one. For example:
val throttled = flow {
emit("A" to 1) // emits at t=0
delay(100)
emit("B" to 6) // emits at t=100
delay(100)
emit("A" to 7) // emits at t=200
delay(100)
emit("A" to 2) // emits at t=300
delay(100)
emit("B" to 8) // emits at t=400
delay(300)
emit("A" to 3) // emits at t=700
}
.throttle(201.milliseconds)
// ...
should produce downstream of: (B, 6)
, (A, 2)
, (B, 8)
, (A, 3)
.
Basically the timeout should "reset" whenever new value for the key arrives and should wait for the specified duration before it sends the pair downstream. The values can come into flow sporadically, but no pair should be left behind waiting for another pairs coming in.
Is it possible to achieve this with current flow API?
Can you try this code with channelFlow:
fun <K, V> Flow<Pair<K, V>>.debouncePerKey(timeout: Long): Flow<Pair<K, V>> {
return channelFlow {
val latestValues = mutableMapOf<K, Job>()
collect { (key, value) ->
latestValues[key]?.cancel()
val job = launch {
delay(timeout)
send(key to value)
}
latestValues[key] = job
}
}
}