Search code examples
androidkotlinkotlin-coroutineskotlin-flow

Cancel a child kotlin coroutine flow without cancelling parent


I'm trying to add a cancel action feature to a MVI coroutine flow implementation. The unified data flow (UDF) starts with SharedFlows for events that are then flatMapped to other flows that return results to pass to the reducer. How can I manage to have a job for each flatMapped action that I can cancel with another event. Current code to be refactored to add cancellable feature.

MyViewModel: ViewModel() {

    val inputs: MutableSharedFlow<I> = MutableSharedFlow()
    private val viewModelListener: MutableStateFlow<Output> = MutableStateFlow(initialState)
    private lateinit var job: Job
    
    fun observe(): StateFlow<Output> = viewModelListener.asStateFlow()
    
    private fun activate() {
       job = viewModelScope.launch(dispatcher) {
            inputs.flatMapMerge { cancellableAction(it) } // how to make this flow cancellable?
            .flatMapConcat { flowOf(it, ProgressOutcome(false)) }
            .onStart { emit(ProgressOutcome(true)) }
            .catch { cause -> emit(ErrorOutcome(cause)) }
            .flowOn(dispatcher)
            .collect { viewModelListener.emit(it) }
    }

    final override fun onCleared(): Unit = job.cancel()
}

Solution

  • To make the flows generated by cancellableAction cancellable, you can use the takeWhile operator with a flag that allows you to cancel the ongoing operations when needed. You can introduce a shared mutable state variable to manage this flag.

        class MyViewModel : ViewModel() {
    
        val inputs: MutableSharedFlow<I> = MutableSharedFlow()
        private val viewModelListener: MutableStateFlow<Output> = MutableStateFlow(initialState)
        private lateinit var job: Job
    
        // Introduce a shared flag to control cancellation
        private val cancellationFlag = MutableStateFlow(false)
    
        fun observe(): StateFlow<Output> = viewModelListener.asStateFlow()
    
        private fun activate() {
            job = viewModelScope.launch(dispatcher) {
                inputs.flatMapMerge { cancellableAction(it) }
                    .flatMapConcat { flowOf(it, ProgressOutcome(false)) }
                    .onStart { emit(ProgressOutcome(true)) }
                    .catch { cause -> emit(ErrorOutcome(cause)) }
                    .flowOn(dispatcher)
                    .takeWhile { !cancellationFlag.value } // Cancellable point
                    .collect { viewModelListener.emit(it) }
            }
        }
    
        // Function to cancel the ongoing operations
        fun cancel() {
            cancellationFlag.value = true
        }
    
        final override fun onCleared(): Unit {
            cancel() // Cancel the ongoing operations when the ViewModel is cleared
            job.cancel()
        }
    
    }
    

    Introduced a "cancellationFlag" as a MutableStateFlow to control the cancellation of ongoing operations. In the "activate" function, using the "takeWhile" operator to stop collecting items from the flow when cancellationFlag becomes true. This makes the flow cancellable at this point.Added a "cancel" function that can be called externally to set the cancellationFlag to true, canceling the ongoing operations. In the onCleared override, called the cancel function to ensure that the ongoing operations are canceled when the ViewModel is cleared, and then cancelling the job.

    Now, you can call the cancel function to stop the ongoing flows whenever you need to cancel them.