Search code examples
android-jetpack-composekotlin-flowkotlin-stateflowkotlin-sharedflow

Retrigger collection from StateFlow/SharedFlow from Activity in Jetpack compose


Activity code which is collecting the flow as state and updating the UI based on the state emitted by the Flow.

setContent {
    val viewModel: MainViewModel = viewModel()
    val state: State<UiState> = viewModel.flow.collectAsStateWithLifecycle(
            initialValue = UiState.Loading,
            lifecycle,
            minActiveState = Lifecycle.State.STARTED
    )
    MainScreen(uiState = state.value)
}

ViewModel code which is exposing the StateFlow

val flow: Flow<UiState> = repository.flow
        .map {
            if (it.datetime == "") {
                emit(UiState.Error("Failure"))
            } else {
                UiState.Success(it.datetime) 
            } 
        }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = UiState.Loading
        )

Cold flow exposed by the repository:

val flow = flow {
        while (true) {
            delay(5000)
            emit(getCurrentTime())
        }
    }.catch {
        // Emit Empty Object which will be treated as an error 
        emit(CurrentTime(""))
    }

If there is an exception in the upstream cold flow which is exposed by the Repository, there will no more emissions from the upstream flow as it gets completed. We can emit an UiState.Error in such case to the UI (shown above).

Is there a way we can retrigger the upstream flow again once the UI is in error state.


Solution

  • Catch is a terminal flow operator, therefore the flow is completed afterward.

    You can use retry() operator to restart the flow if an exception happens.

    val flow = flow {
        while (true) {
            delay(5000)
            emit(getCurrentTime())
        }
    }.retry {
        // Emit Empty Object which will be treated as an error 
        emit(CurrentTime(""))
    }
    

    If you want to have a way to restart the producer flow from the UI, you need to have some kind of trigger flow, which does not terminate.

    Here is an example implementation using a Channel:

    private val triggerChannel = Channel<Unit>(Channel.CONFLATED)
    
    private val producerFlow = flow {
        while (true) {
            delay(1000)
            emit("producing")
        }
    }.catch {
        emit("uh oh, error!")
    }
    
    val uiStateFlow = triggerChannel
        .receiveAsFlow()
        .onStart { triggerChannel.send(Unit) }
        .flatMapLatest {
            producerFlow.map {
                if (it == "producing") {
                    UiState.Success
                } else {
                    UiState.Error
                }
            }
        }.stateIn(
                scope = viewModelScope,
                started = SharingStarted.WhileSubscribed(5000),
                initialValue = UiState.Loading
        )
    
    fun retry() {
        viewModelScope.launch {
            triggerChannel.send(Unit)
        }
    }
    

    This code restarts the producer, when Unit is sent to the channel. Keep in mind, that you need to send the trigger to the channel when you subscribe to the uiStateFlow, that's why onStart { triggerChannel.send(Unit) } is needed.