Search code examples
kotlinkotlin-flow

kotlin Flow, how to use the override collect function


Learning the FLow, saw a code returns Flow. But dont understand how the passed in "block" is called by calling the

flow.collect { value ->
    println(value)
}

the related code;

internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

suspend fun FlowCollector<Int>.loopFunc() {
    for (i in 1..5) {
        emit(i)
    }
}

val flow: Flow<Int> = unsafeFlow<Int> {
    loopFunc()
}

flow.collect { value ->
    println(value)
}

Solution

  • Disclaimer: after discussing this in comments, apparently the confusion came mostly from trailing lambdas and SAM conversions. Author also asked how this code actually works.

    Let's start with basics:

    • FlowCollector - consumer of items from a flow. It is a passive side, items are emitted (pushed) into it.
    • Flow - source of items. They can be consumed by providing a collector to it. Then the flow emits into our collector.
    • loopFunc() - we can call it a generator function. It receives a collector and produces items by emitting them into that collector. It is somehow similar to a flow, because it accepts a collector and emits to it. But it is not a standard way to represent a source of items - usually, we use a flow.
    • unsafeFlow() - it converts this generator function into a flow. It creates a flow and if we collect from it (provide a collector to it), the only thing it does is delegating to a stored generator function.

    Now, explaining this code:

    flow.collect { value ->
        println(value)
    }
    

    It uses trailing lambda syntax and SAM conversion. It is the same as:

    flow.collect(object : FlowCollector<Int> {
        override suspend fun emit(value: Int) {
            println(value)
        }
    })
    

    Summing everything up, the code flow is like this:

    1. We create a flow by using unsafeFlow(). This flow stores loopFunc() generator function and simply delegates to it when collected.
    2. We create a collector which simply prints items emitted into it.
    3. We collect from the flow by providing the collector to it.
    4. Flow created in unsafeFlow() simply passes the collector to loopFunc().
    5. loopFunc() runs a loop, calling our collector with subsequent items.
    6. When the loop ends, we return from loopFunc() and then from the flow.collect() function, so the code can continue.

    The whole code could be really simplified to this:

    FlowCollector<Int> { value -> println(value) }.loopFunc()
    

    But it uses a Flow in the middle which adds complexity.