Flow's onEach
documentation says:
Returns a flow that invokes the given action before each value of the upstream flow is emitted downstream.
Is there a way to do the same, but after each value of the upstream flow is emitted downstream? Preferably without using delay(long)
.
Flow
is really a very simple interface, so we can easily create such operator ourselves. The only thing we need to do is to collect the upstream flow and for each item first emit it to the downstream flow and then call the provided action:
fun <T> Flow<T>.onEachAfter(action: suspend (T) -> Unit): Flow<T> = object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
this@onEachAfter.collect {
collector.emit(it)
action(it)
}
}
}
We can make it simpler by using the convenient transform operator as our building block:
fun <T> Flow<T>.onEachAfter(action: suspend (T) -> Unit): Flow<T> = transform {
emit(it)
action(it)
}
Usage:
suspend fun main() {
flowOf(1, 2, 3)
.onEachAfter { println("On each after: $it") }
.collect { println("Collect: $it") }
}
----------------
Collect: 1
On each after: 1
Collect: 2
On each after: 2
Collect: 3
On each after: 3
Please be aware this might be seen as a code smell or a bad practice. In most cases, we should consider producers and consumers to be separate things and we shouldn't assume any specific ordering in which they perform their actions. If we require a specific ordering, that may be an indicator we designed our application incorrectly. This might be also the reason why such operator wasn't introduced in the first place.