Search code examples
kotliniosequencekotlin-coroutines

Kotlin wrap sequential IO calls as a Sequence


I need to process all of the results from a paged API endpoint. I'd like to present all of the results as a sequence.

I've come up with the following (slightly psuedo-coded):

suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
    var currentRequest: Request? = client.requestForNextPage()
    return withContext(Dispatchers.IO) {
         sequence {
            while(currentRequest != null) {
                var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
                currentRequest = client.requestForNextPage()
                yieldAll(rowsInPage)
            }
        }
     }
}

This functions but I'm not sure about a couple of things:

  1. Is the API request happening inside runBlocking still happening with the IO dispatcher?
  2. Is there a way to refactor the code to launch the next request before yielding the current results, then awaiting on it later?

Solution

  • Question 1: The API-request will still run on the IO-dispatcher, but it will block the thread it's running on. This means that no other tasks can be scheduled on that thread while waiting for the request to finish. There's not really any reason to use runBlocking in production-code at all, because:

    1. If makeRequest is already a blocking call, then runBlocking will do practically nothing.
    2. If makeRequest was a suspending call, then runBlocking would make the code less efficient. It wouldn't yield the thread back to the pool while waiting for the request to finish.

    Whether makeRequest is a blocking or non-blocking call depends on the client you're using. Here's a non-blocking http-client I can recommend: https://ktor.io/clients/

    Question 2: I would use a Flow for this purpose. You can think of it as a suspendable variant of Sequence. Flows are cold, which means that it won't run before the consumer asks for its contents (in contrary to being hot, which means the producer will push new values no matter if the consumer wants it or not). A Kotlin Flow has an operator called buffer which you can use to make it request more pages before it has fully consumed the previous page.

    The code could look quite similar to what you already have:

    suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
        var currentRequest: Request? = client.requestForNextPage()
    
        while(currentRequest != null) {
            val rowsInPage = client.makeRequest(currentRequest)
            emitAll(rowsInPage.asFlow())
            currentRequest = client.requestForNextPage()
        }
    }.flowOn(Dispatchers.IO)
    .buffer(capacity = 1)
    

    The capacity of 1 means that will only make 1 more request while processing an earlier page. You could increase the buffer size to make more concurrent requests. You should check out this talk from KotlinConf 2019 to learn more about flows: https://www.youtube.com/watch?v=tYcqn48SMT8