Search code examples
kotlinkotlin-coroutinesktorkotlin-flowktor-client

How can I collect forever in a Kotlin Flow to implement a retry mechanism?


I'm using kotlin flows to collect and emit from ktor websocet. I want to implement a retry mechanism when the httpClient.webSocket throws an exception. After that, I want to call retryWith function to create run the flow block again.

The real world scenario is this: When there is no internet connection. httpClient.webSocket throws an UnresolvedAddressException and retry block makes it try it three times. The user connects to internet and I call retryWith function. I want the flow block to run again. But since the flow is completed, exceptionFlow is not collected.

Here is what I've tried:

fun initConnection(scope: CoroutineScope) = flow {
        coroutineScope {
            val exceptionHandler = CoroutineExceptionHandler { _, exception ->
                // Catch the exception and emit it to the shared flow.
                println("*** emiting exception $exception")
                exceptionFlow.tryEmit(exception)
            }
            launch(exceptionHandler) {
                exceptionFlow.collect {
                    println("*** throwing $it")
                    throw it //throw here to trigger retry
                }
            }

            httpClient.webSocket(
                block = {
                    receiveAll {
                        emit(it)
                        handleIncoming(it)
                    }
                },
            )
        }
    }.retry(3) { cause ->
        println("*** retrying cause: $cause")
        connectionState = ConnectionState.RETRYING

        delay(500L)

        true
    }.catch {
        println("*** caught $it")
        connectionState = ConnectionState.DISCONNECTED
        emit(Incoming.Exception(it))
        close()
    }.onCompletion {
        println("*** onCompletion $it")
    }


suspend fun retryWith(throwable: Throwable) {
    exceptionFlow.emit(throwable)
}

How can I fix this? Maybe I need to suspend the flow forever somehow, but I'm not sure that's a good idea.


Solution

  • Initial thoughts

    There are multiple "suspicious" parts in your code:

    1. Using CoroutineExceptionHandler for controlling the logic of your service. I think this handler is more for cases like logging unhandled exceptions. Main way of reacting to exceptions is by using try ... catch.
    2. Utilizing exceptions for signalling and controlling the flow of the application.
    3. It feels strange that flow has side effects while being generated (connectionState, close(), handleIncoming()).
    4. Each time we call initConnection() it starts a new flow, so we can potentially have multiple flows running at the same time. Then we call retryWith() on the singleton service, so does that mean it should retry all previously started flows? That feels strange. Maybe it would make more sense to create a single shared flow?
    5. Sending events through flows and through callbacks (retryWith()) are two different approaches to the same problem. We can mix them, but it is usually better to use one pattern consistently. Maybe instead of calling retryWith() when the user connects to the internet, it would be better to expose an "internet connectivity state" flow and the above code would simply observe that flow?

    Also, you didn't specify when exactly retryWith() can be called and what is your intended behavior in various cases. For example, if we are currently at the second retry, so it didn't yet switch to the disconnected state, is it possible retryWith() is called? If yes, should it be ignored and progress with the second retry or should it reset the retry counter?

    Answer

    One way to provide the restarting behavior for flows is by using functions like flatMapConcat(), flatMapLatest(), transformLatest(), etc. We create one flow for sending restart commands and we flat map each item into the flow which we intend to restart. Whenever we send an item in the first flow (restart command), it creates a new internal flow and downstreams its items:

    restartFlow.flatMapLatest { createRestartableFlow() } 
    

    Depending on our case we could prefer to use flatMapConcat() or flatMapLatest(). They differ in the behavior when we receive a restart command while the internal flow is still active. First function waits for the previously created internal flow to complete. Second cancels the previous flow and starts another one as soon as possible. Also, it probably makes sense to conflate() the command flow, because usually when we receive multiple restart commands in a quick succession, we probably want to restart only once.

    For signalling restarts we can use MutableSharedFlow, but because we have only a single consumer (I assume we call initConnection() only once or we use a shared flow), I think it makes more sense to use a conflated channel instead.

    I may be unable to provide a fully working example for you. You may also need to fine-tune it for your specific needs. But it should be something along lines:

    private val restartChannel = Channel<Unit>(Channel.CONFLATED)
        .also { it.trySend(Unit) } // we need one item initially to start straight away
    
    fun initConnection() = restartChannel.consumeAsFlow().flatMapLatest {
        flow {
            httpClient.webSocket(
                block = {
                    receiveAll {
                        emit(it)
                        handleIncoming(it)
                    }
                },
            )
        }.retry(3) { cause ->
            println("*** retrying cause: $cause")
            connectionState = ConnectionState.RETRYING
    
            delay(500L)
    
            true
        }.catch {
            println("*** caught $it")
            connectionState = ConnectionState.DISCONNECTED
            emit(Incoming.Exception(it))
            close()
        }.onCompletion {
            println("*** onCompletion $it")
        }
    }
    
    fun retry() {
        // Conflated channel never suspends and is guaranteed to succeed.
        restartChannel.trySend(Unit)
    }
    

    Please be aware I didn't at all test this code as it is hard without the working websocket part.