Search code examples
kotlinkotlin-coroutinesjooq

How to run blocking Java code concurrently in Kotlin?


I'm working on a new side project with a goal of more deeply learning Kotlin, and I'm having a little trouble figuring out how to mix Kotlin-style concurrency with code not written with coroutines in mind (JOOQ in this case). The function below is in one of my DAOs, and in that map block, I want to update a bunch of rows in the DB. In this specific example, the updates are actually dependent on the previous one completing, so it will need to be done sequentially, but I'm interested in how this code could be modified to run the updates in parallel, since there will undoubtedly be use cases I have that don't need to be run sequentially.

suspend fun updateProductChoices(choice: ProductChoice) = withContext(Dispatchers.IO) {
    ctx().transaction { config ->
        val tx = DSL.using(config)

        val previousRank = tx.select(PRODUCT_CHOICE.RANK)
                .from(PRODUCT_CHOICE)
                .where(PRODUCT_CHOICE.STORE_PRODUCT_ID.eq(choice.storeProductId))
                .and(PRODUCT_CHOICE.PRODUCT_ID.eq(choice.productId))
                .fetchOne(PRODUCT_CHOICE.RANK)

        (previousRank + 1..choice.rank).map { rank ->
            tx.update(PRODUCT_CHOICE)
                    .set(PRODUCT_CHOICE.RANK, rank - 1)
                    .where(PRODUCT_CHOICE.PRODUCT_ID.eq(choice.productId))
                    .and(PRODUCT_CHOICE.RANK.eq(rank))
                    .execute()
        }
    }
}

Would the best way be to wrap the transaction lambda in runBlocking and each update in an async, then awaitAll the result? Also possibly worth noting that the JOOQ queries support executeAsync() which returns a CompletionStage.


Solution

  • Yes, use JOOQ's executeAsync. With executeAsync, you can remove the withContext(Dispatchers.IO) because the call is no longer blocking.

    The kotlinx-coroutines-jdk8 library includes coroutines integration with CompletionStage, so you can do a suspending await on it (docs).

    To perform the updates in parallel, note that the same library can convert a CompletionStage to a Deferred (docs). Therefore, if you change the call to execute to executeAsync().asDeferred() you will get a list of Deferreds, on which you can awaitAll().