I'm encountering a situation where a MutableStateFlow
in my Kotlin code isn't emitting the same value twice in a row, even when I explicitly call emit()
with the same value. This is preventing my collectors from being notified as expected.
Here's a minimal example:
private val dragOffset = MutableStateFlow(Offset.Zero)
val offset = Offset(100F, 100F)
dragOffset.emit(offset) // Collector prints this
dragOffset.emit(offset) // Collector doesn't print this
dragOffset.collect { dragOff ->
println(dragOff)
}
the question is Are there any best practices or recommended approaches for working with StateFlows when re-emitting identical values is necessary?
I'd appreciate any insights or guidance from the community on how to best address this issue.
It is not possible to collect the same value twice in a row with state flows, because they're design specifically to react to a change of state.
If your value does not represent an "absolute state". (Ex: the position of an element inside a window), then state flow might not be what your require.
However, its parent class: SharedFlow
might be what you need. Like state flows, shared flows are hot, infinite data stream that broadcast emitted value to all their collectors.
Shared flow are more configurable than Stateflow, and can therefore be adapted to more cases.
For example, you can instruct a shared flow to replay the latest emitted value to consumers, and to drop previously cached value. That should emulate a single state accepting duplicates.
Let's look at an example:
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun CoroutineScope.prepareFlow(): SharedFlow<Int> {
// Prepare the flow holding only a single value at any time.
val flow = MutableSharedFlow<Int>(1)
launch {
// For this particular example, wait for a subscriber before emitting any value.
// We want to be sure the collector receives both emitted values.
flow.subscriptionCount.filter { it > 0 }.first()
// Emit the same value twice
flow.emit(1)
flow.emit(1)
}
return flow.asSharedFlow()
}
fun main() : Unit = runBlocking {
val flow = prepareFlow()
// The first consumer will see both values
launch {
flow.collect { println("C1: $it") }
}
// Add a second consumer a little later: it sees only the latest state
delay(10)
launch {
flow.collect { println("C2: $it") }
}
}