I need to process all of the results from a paged API endpoint. I'd like to present all of the results as a sequence.
I've come up with the following (slightly psuedo-coded):
suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
var currentRequest: Request? = client.requestForNextPage()
return withContext(Dispatchers.IO) {
sequence {
while(currentRequest != null) {
var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
currentRequest = client.requestForNextPage()
yieldAll(rowsInPage)
}
}
}
}
This functions but I'm not sure about a couple of things:
runBlocking
still happening with the IO dispatcher?Question 1: The API-request will still run on the IO-dispatcher, but it will block the thread it's running on. This means that no other tasks can be scheduled on that thread while waiting for the request to finish. There's not really any reason to use runBlocking
in production-code at all, because:
makeRequest
is already a blocking call, then runBlocking
will do practically nothing.makeRequest
was a suspending call, then runBlocking
would make the code less efficient. It wouldn't yield the thread back to the pool while waiting for the request to finish.Whether makeRequest
is a blocking or non-blocking call depends on the client you're using. Here's a non-blocking http-client I can recommend: https://ktor.io/clients/
Question 2: I would use a Flow
for this purpose. You can think of it as a suspendable variant of Sequence
. Flows are cold, which means that it won't run before the consumer asks for its contents (in contrary to being hot, which means the producer will push new values no matter if the consumer wants it or not). A Kotlin Flow
has an operator called buffer
which you can use to make it request more pages before it has fully consumed the previous page.
The code could look quite similar to what you already have:
suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
var currentRequest: Request? = client.requestForNextPage()
while(currentRequest != null) {
val rowsInPage = client.makeRequest(currentRequest)
emitAll(rowsInPage.asFlow())
currentRequest = client.requestForNextPage()
}
}.flowOn(Dispatchers.IO)
.buffer(capacity = 1)
The capacity of 1 means that will only make 1 more request while processing an earlier page. You could increase the buffer size to make more concurrent requests. You should check out this talk from KotlinConf 2019 to learn more about flows: https://www.youtube.com/watch?v=tYcqn48SMT8