Search code examples
kotlinproject-reactorarrow-kt

How to execute asyncronously inside an Arrow + Reactor Monad comprehension


In he following piece of code, every helloX() method runs asynchronously (it's a deferred Mono that runs in a separate thread), see full code below):

    override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }

However in the logs I see that they are run secuentially:

14:10:46.983 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:10:47.084 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey()
14:10:49.087 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey() - ready
14:10:49.090 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn()
14:10:54.091 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn() - ready
14:10:54.092 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary()
14:10:59.095 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary() - ready
hello Joey and hello John and hello Mary

How could I make them execute in parallel and aggregate all the results in the monad comprehension once all of them have finished?

Full code with main method():

class HelloServiceImpl : HelloService<ForMonoK> {

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.monad().fx.monad {
            val j = !helloJoey()
            val j2 = !helloJohn()
            val j3 = !helloMary()
            "$j and $j2 and $j3"
        }.fix()
    }

    override fun helloJoey(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloJoey()")
            sleep(2000)
            logger.info("helloJoey() - ready")
            Mono.just("hello Joey")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloJohn(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloJohn()")
            sleep(5000)
            logger.info("helloJohn() - ready")
            Mono.just("hello John")
        }.subscribeOn(Schedulers.elastic()).k()
    }

    override fun helloMary(): Kind<ForMonoK, String> {
        return Mono.defer {
            logger.info("helloMary()")
            sleep(5000)
            logger.info("helloMary() - ready")
            Mono.just("hello Mary")
        }.subscribeOn(Schedulers.elastic()).k()
    }

}

fun main() {
    val countDownLatch = CountDownLatch(1)
    HelloServiceImpl().helloEverybody().fix().mono.subscribe {
        println(it)
        countDownLatch.countDown()
    }
    countDownLatch.await()
}

UPDATE

I've adapted the method to combine a sequential operation with a parallel one:

    override fun helloEverybody(): Kind<ForMonoK, String> {
        return MonoK.async().fx.async {
            val j = helloJoey().bind()
            val j2= Dispatchers.IO
                    .parMapN(helloJohn(), helloMary()){ it1, it2 -> "$it1 and $it2" }
            "$j and $j2"
        }
    }

Unfortunatelly parMapN can'be be used with ForMonoK:

Type inference failed: fun <A, B, C, D> CoroutineContext.parMapN(fa: Kind<ForIO, A>, fb: Kind<ForIO, B>, fc: Kind<ForIO, C>, f: (A, B, C) -> D): IO<D>
cannot be applied to
receiver: CoroutineDispatcher  arguments: (Kind<ForMonoK, String>,Kind<ForMonoK, String>,Kind<ForMonoK, String>,(String, String, String) -> String)

Ideas?


Solution

  • flatMap, same as map, don't have thread semantics or parallelism. What you're after is called parMap and parTraverse, which runs several MonoK in parallel.

    At that point the fx block becomes innecessary, as it's designed for sequential operations. You can mix and match both.

    MonoK.async().fx.async {
    
      val result = 
        Dispatchers.IO
         .parMap(helloJoey(), helloMary()) { joe, mary -> ... }
         .bind()
    
      otherThing(result).bind()
    
    }