I'm working on a new side project with a goal of more deeply learning Kotlin, and I'm having a little trouble figuring out how to mix Kotlin-style concurrency with code not written with coroutines in mind (JOOQ in this case). The function below is in one of my DAOs, and in that map block, I want to update a bunch of rows in the DB. In this specific example, the updates are actually dependent on the previous one completing, so it will need to be done sequentially, but I'm interested in how this code could be modified to run the updates in parallel, since there will undoubtedly be use cases I have that don't need to be run sequentially.
suspend fun updateProductChoices(choice: ProductChoice) = withContext(Dispatchers.IO) {
ctx().transaction { config ->
val tx = DSL.using(config)
val previousRank = tx.select(PRODUCT_CHOICE.RANK)
.from(PRODUCT_CHOICE)
.where(PRODUCT_CHOICE.STORE_PRODUCT_ID.eq(choice.storeProductId))
.and(PRODUCT_CHOICE.PRODUCT_ID.eq(choice.productId))
.fetchOne(PRODUCT_CHOICE.RANK)
(previousRank + 1..choice.rank).map { rank ->
tx.update(PRODUCT_CHOICE)
.set(PRODUCT_CHOICE.RANK, rank - 1)
.where(PRODUCT_CHOICE.PRODUCT_ID.eq(choice.productId))
.and(PRODUCT_CHOICE.RANK.eq(rank))
.execute()
}
}
}
Would the best way be to wrap the transaction
lambda in runBlocking
and each update
in an async
, then awaitAll
the result? Also possibly worth noting that the JOOQ queries support executeAsync()
which returns a CompletionStage
.
Yes, use JOOQ's executeAsync
. With executeAsync
, you can remove the withContext(Dispatchers.IO)
because the call is no longer blocking.
The kotlinx-coroutines-jdk8
library includes coroutines integration with CompletionStage
, so you can do a suspending await
on it (docs).
To perform the updates in parallel, note that the same library can convert a CompletionStage
to a Deferred
(docs). Therefore, if you change the call to execute
to executeAsync().asDeferred()
you will get a list of Deferred
s, on which you can awaitAll()
.