Search code examples
kotlinkotlin-coroutineskotlin-stateflow

Kotlin Coroutines StateFlow not emitting identical values consecutively—How to force collection or work around this behavior?


I'm encountering a situation where a MutableStateFlow in my Kotlin code isn't emitting the same value twice in a row, even when I explicitly call emit() with the same value. This is preventing my collectors from being notified as expected.

Here's a minimal example:

private val dragOffset = MutableStateFlow(Offset.Zero)
val offset = Offset(100F, 100F)
dragOffset.emit(offset)  // Collector prints this
dragOffset.emit(offset)  // Collector doesn't print this

dragOffset.collect { dragOff ->
    println(dragOff)
}

the question is Are there any best practices or recommended approaches for working with StateFlows when re-emitting identical values is necessary?

I'd appreciate any insights or guidance from the community on how to best address this issue.


Solution

  • It is not possible to collect the same value twice in a row with state flows, because they're design specifically to react to a change of state.

    If your value does not represent an "absolute state". (Ex: the position of an element inside a window), then state flow might not be what your require.

    However, its parent class: SharedFlow might be what you need. Like state flows, shared flows are hot, infinite data stream that broadcast emitted value to all their collectors.

    Shared flow are more configurable than Stateflow, and can therefore be adapted to more cases.

    For example, you can instruct a shared flow to replay the latest emitted value to consumers, and to drop previously cached value. That should emulate a single state accepting duplicates.

    Let's look at an example:

    import kotlinx.coroutines.CoroutineScope
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.flow.*
    import kotlinx.coroutines.launch
    import kotlinx.coroutines.runBlocking
    
    
    fun CoroutineScope.prepareFlow(): SharedFlow<Int> {
        // Prepare the flow holding only a single value at any time.
        val flow = MutableSharedFlow<Int>(1)
        launch {
            // For this particular example, wait for a subscriber before emitting any value.
            // We want to be sure the collector receives both emitted values.
            flow.subscriptionCount.filter { it > 0 }.first()
            // Emit the same value twice
            flow.emit(1)
            flow.emit(1)
        }
        return flow.asSharedFlow()
    }
    
    fun main() : Unit = runBlocking {
        val flow = prepareFlow()
    
        // The first consumer will see both values
        launch {
            flow.collect { println("C1: $it") }
        }
    
        // Add a second consumer a little later: it sees only the latest state
        delay(10)
        launch {
            flow.collect { println("C2: $it") }
        }
    }