Search code examples
kotlinkotlin-flowkotlin-stateflow

Can two collect be used with the same StateFlow?


I am trying to understand the use of flow in Kotlin and I was trying different codes, in particular I was with this method:

val stateFlow = MutableStateFlow(0) 

private fun stateFlowWithContinueCollect() {
    runBlocking {

        launch {
            repeat(10) {
                delay(1000)
                stateFlow.value = stateFlow.value + 1
            }
        }

        stateFlow.collect { value ->
            println("Actual value is $value")
            if (value == 6) {
                continueInSameFlow()
            }
        }
    }
}

suspend fun continueInSameFlow() {
    stateFlow.collect {
        println("Im in the second collect, value is $it")
    }
}

I understand that when I am in the first collect, it will be receiving values until it enters the second collect when value is equal to 6, so as it is a suspend function, will stay in until it is finished.

Is there any way for two collects to take data from the same StateFlow at the same time, or does that not make sense?

I have tried putting one collect under another, but apart from Android Studio giving me a warning that it does not make sense, I see that it only executes one collect but not the second one.


Solution

  • Summary

    Replace your call to continueInSameFlow() with launch { continueInSameFlow() } so you're running the second collector asynchronously in another coroutine. Collecting suspends the coroutine until the flow completes, but StateFlows never complete, so you're causing the first collection to hang.

    Detailed explanation

    flow.collect() suspends the coroutine to wait for the flow to complete emitting all the items it will ever emit. StateFlows are infinite (as are SharedFlows and many cold Flows as well.)

    Therefore, if you do something like this:

    scope.launch {
        someInfiniteFlow.collect {
            println(it)
        }
    
        println("Hello") // unreachable code
    }
    

    then "Hello" will never be printed, because collect never returns. It will suspend forever, always waiting for the next item to be emitted and then running the lambda again for each one.

    So of course, if you put another collect call in place of the println(), that will never be reached either. It doesn't matter if it's the same flow or a different one. Whatever you're doing down there is unreachable code.

    scope.launch {
        someInfiniteFlow.collect {
            println(it)
        }
    
        anyFlow.collect { // unreachable code
            println(it)
        }
    }
    

    In your sample code, this same principle applies. When you start collecting an infinite flow inside your outer collect lambda, you are calling a function that will suspend forever. Therefore, that iteration of your lambda will never return, so it will never be called again with another item to process.


    Therefore, if you want to collect flows in parallel (whether or not they are the same flow), you must do it in separate coroutines:

    scope.launch {
        someInfiniteFlow.collect {
            println(it)
        }
    }
    
    scope.launch {
        anyFlow.collect {
            println(it)
        }
    }
    

    For this reason, it is very common that a coroutine that collects a flow doesn't do anything besides collecting that flow. It's so common that there is a launchIn operator for Flows to help make code easier to read (less nested indentation). This is equivalent to the above:

    someInfiniteFlow.onEach {
        println(it)
    }.launchIn(scope)
    
    anyFlow.onEach {
        println(it)
    }.launchIn(scope)
    

    Here is how I would rewrite the code from your question so the first collection is not interrupted by the second one that starts after "6" is emitted:

    val stateFlow = MutableStateFlow(0) 
    
    private fun stateFlowWithContinueCollect() {
        runBlocking {
    
            launch {
                repeat(10) {
                    delay(1000)
                    stateFlow.value = stateFlow.value + 1
                }
            }
    
            stateFlow
                .onEach { value ->
                    println("Actual value is $value")
                }
                .launchIn(this)
    
            stateFlow
                .dropWhile { it != 6 }
                .onEach {
                    println("Im in the second collect, value is $it")
                }
                .launchIn(this)
        }
    }
    

    The launchIn calls are launching child coroutines of the runBlocking's CoroutineScope instead of synchronously collecting the flows directly in the runBlocking coroutine.

    If you want to do it your way, you can. In your code, wrap the call to continueInSameFlow() in launch { } so it is collected asynchronously to the first collector.

    Note that either way you do it, since you're collecting infinite flows, your outer function stateFlowWithContinueCollect() can never return.