Search code examples
kotlinkotlin-coroutineskotlin-sharedflow

Should I emit from a coroutine when collecting from a different flow?


I have a use case where I need to trigger on a specific event collected from a flow and restart it when it closes. I also need to emit all of the events to a different flow. My current implementation looks like this:

scope.launch {
    val flowToReturn = MutableSharedFlow<Event>()
    while (true) {
        client
            .connect()                                   // returns Flow<Event>
            .catch { ... }                               // ignore errors
            .onEach { launch { flowToReturn.emit(it) } } // problem here
            .filterIsInstance<Event.Some>()
            .collect { someEvent ->
                doStuff(someEvent)
            }
    }
}.start()

The idea is to always reconnect when the client disconnects (collect then returns and a new iteration begins) while having the outer flow lifecycle separate from the inner (connection) one. It being a shared flow with potentially multiple subscribers is a secondary concern.

As the emit documentation states it is not thread-safe. Should I call it from a new coroutine then? My concern is that the emit will suspend if there are no subscribers to the outer flow and I need to run the downstream pipeline regardless.


Solution

  • The MutableSharedFlow.emit() documentation say that it is thread-safe. Maybe you were accidentally looking at FlowCollector.emit(), which is not thread-safe. MutableSharedFlow is a subtype of FlowCollector but promotes emit() to being thread-safe since it's not intended to be used as a Flow builder receiver like a plain FlowCollector. There's no reason to launch a coroutine just to emit to your shared flow.

    There's no reason to call start() on a coroutine Job that was created with launch because launch both creates the Job and starts it.

    You will need to declare flowToReturn before your launch call to be able to have it in scope to return from this outer function.