Search code examples
kotlinkotlin-sharedflow

Kotlin SharedFlow combine operation. Have zip behaviour in a specific situation


I'm combining two SharedFlows and then performing a long working operation.

At the start, I know the state so I emit a "starting value" for both the flows. After that user can emit to either flows.

Both flows are mostly independent but in a specific situation, the user can emit to both flows at the same time. What this does is that combine is triggered twice and the long working job is performed twice when in fact, in this case, I'm only interested receiving both values but only performing the job once.

Here is what I have:

val _numbers = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val numbers: SharedFlow<Int> = _numbers
val _strings = MutableSharedFlow<String>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val strings: SharedFlow<String> = _strings

combine(numbers, strings) { (number, strings) ->
    println("values $number - $strings. Starting to perform a long working job")
}
    .launchIn(CoroutineScope(Dispatchers.IO))

runBlocking {

    delay(500)
    // This is the initial values. I always know this at start.
    _numbers.emit(0)
    _strings.emit("a")

    // Depending of user action, number or string is emitted.

    delay(100)
    _numbers.emit(1)
    delay(100)
    _numbers.emit(2)
    delay(100)
    _numbers.emit(3)
    delay(100)
    _numbers.emit(4)
    delay(100)
    _strings.emit("b")
    delay(100)
    _strings.emit("c")
    delay(100)
    _strings.emit("d")
    delay(100)
    _strings.emit("e")
    delay(100)

    // In a specific situation both values need to change but I only want to trigger the long working job once
    _numbers.emit(10)
    _strings.emit("Z")
}

This can produce this:

values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job

Or this:

values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job

Due to the buffer overflow, sometimes I can achieve what I want (this latest one) but on others, I have the values 10 - e. Starting to perform a long working job that I'm not interested in.

Is there any way I can enforce, when emitting to the two, only start the long work once?

https://pl.kotl.in/JA1Wdhra9


Solution

  • If you want to keep 2 flows, the distinction between single and double events will have to be time-based. You won't be able to distinguish between a quick update of string-then-number from a "double-update".

    If time-based is ok for you, using debounce before the long processing should be the way to go:

    combine(numbers, strings) { (number, string) -> number to string }
        .debounce(50)
        .onEach { (number, string) ->
            println("values $number - $string. Starting to perform a long working job")
        }
        .launchIn(CoroutineScope(Dispatchers.IO))
    

    Here, combine only builds pairs from the 2 flows, but still gets all events, and then debounce ignores quick succession of events and only sends the latest of a quick series. This also introduces a slight delay, but it all depends on what you want to achieve.

    If time-based distinction is not ok for you, you need a way for the producer to send double events in a way that is distinct from 2 single events. For this, you can use a single flow of events, and you can for instance define events like this:

    sealed class Event {
        data class SingleNumberUpdate(val value: Int): Event()
        data class SingleStringUpdate(val value: String): Event()
        data class DoubleUpdate(val num: Int, val str: String): Event()
    }
    

    But then you'll have to write the "combine" logic yourself (keeping the state of the latest number and string):

    flow {
        var num = 0
        var str = "a"
        emit(num to str)
        events.collect { e ->
            when (e) {
                is Event.SingleNumberUpdate -> {
                    num = e.value
                }
                is Event.SingleStringUpdate -> {
                    str = e.value
                }
                is Event.DoubleUpdate -> {
                    num = e.num
                    str = e.str
                }
            }
            emit(num to str)
        }
    }
    .onEach { (number, strings) ->
        println("values $number - $strings. Starting to perform a long working job")
    }
    .launchIn(CoroutineScope(Dispatchers.IO))