Search code examples
kotlinvert.xkotlin-coroutinesvertx-verticle

Kotlin Vertx with Coroutines blocks when trying to call run blocking


I am using a third party library which has exposed a callback function. The call back function will be called upon success . The callback function is not a suspend function, but when I trying to make a call inside the non suspend function to return the result of a suspend function which makes IO call using aysnc and await , the call never gets compelted. Below I have come up with a simple code snippet which demonstrates the problem.

open class TestVerticle: CoroutineVerticle() {

  override suspend fun start() {

    awaitBlockingExample()

  }

 fun awaitBlockingExample():String {

    val future= async(vertx.dispatcher()) {

        makeSuspendFunCall()
     }
     val result:String= runBlocking(vertx.dispatcher()){future.await()}
     println(" The final Result is $result")
     return result
   }

  suspend fun makeSuspendFunCall():String{
    println("Comming here 3")
    delay(500)
    val resp="Test"
    return resp
  }

}
fun main(args: Array<String>) = runBlocking {
    Vertx.vertx().deployVerticle("TestVerticle")
}

The program runs fines if I remove the delay function in makeSuspendFunCall, but it will get hung if I add the delay function. I am actually simulating suspend function network call using the delay function here. How can I get result from awaitBlockingExample in this scenario ? I clearly understand that by making awaitBlockingExample as suspend function I can make this work and remove the async and run blocking calls inside. But here awaitBlockingExample (non suspend function) represents a implementation provided by a this party library where its overridden in our implementation . For example , guava cache provides a reload function, I would like to override the reload function (non suspend function) and call a coroutine function from the reload method to refresh cache value from database or network call.


Solution

  • The problem is that the vertx.dispatcher() uses a single thread as an event loop and runBlocking blocks this thread.

    Details:

    Your awaitBlockingExample() function is running on this Vertx event loop thread, because it is triggered from the suspend start() function. If you invoke runBlocking() this Vertx-thread is blocked and never released. But your other coroutines, e.g. async(), now have no thread to do their work.

    Solution:

    I assume that the invocation of awaitBlockingExample from the start function happens only in this example. In reality I would assume that the external callback uses its own thread. Then there is no problem at all, because now the external thread is blocked:

    override suspend fun start() {
    
        //simulate own thread for external callback
        thread {
            awaitBlockingExample()
        }
    }
    
    fun awaitBlockingExample():String {
    
        val future= async(vertx.dispatcher()) {
    
            makeSuspendFunCall()
        }
        val result:String= runBlocking(vertx.dispatcher()){future.await()}
        println(" The final Result is $result")
        return result
    }
    

    BTW: You don't require the async() block, you can directly call the makeSuspendFunCall() from runBlocking()

    fun awaitBlockingExample():String = runBlocking(vertx.dispatcher()){
        val result = makeSuspendFunCall()
        println(" The final Result is $result")
        result
    }