Search code examples
kotlinconcurrencythrottlingkotlin-flowdebouncing

Key-based debounce in Kotlin Flows


I have a flow of key-value pairs (e.g. Flow<Pair<String, Int>>) which I would like to debounce/throttle based on the key, so that when some sequence of pairs with non-unique keys comes I would only emit further the last one. For example:

val throttled = flow {
        emit("A" to 1)    // emits at t=0
        delay(100)
        emit("B" to 6)    // emits at t=100
        delay(100)
        emit("A" to 7)    // emits at t=200
        delay(100)
        emit("A" to 2)    // emits at t=300
        delay(100)
        emit("B" to 8)    // emits at t=400
        delay(300)
        emit("A" to 3)    // emits at t=700
    }
    .throttle(201.milliseconds)
    // ...

should produce downstream of: (B, 6), (A, 2), (B, 8), (A, 3).

Basically the timeout should "reset" whenever new value for the key arrives and should wait for the specified duration before it sends the pair downstream. The values can come into flow sporadically, but no pair should be left behind waiting for another pairs coming in.

Is it possible to achieve this with current flow API?


Solution

  • Can you try this code with channelFlow:

    fun <K, V> Flow<Pair<K, V>>.debouncePerKey(timeout: Long): Flow<Pair<K, V>> {
    return channelFlow {
        val latestValues = mutableMapOf<K, Job>()
    
        collect { (key, value) ->
            latestValues[key]?.cancel()
    
            val job = launch {
                delay(timeout)
                send(key to value)
            }
            latestValues[key] = job
        }
      }
    }