Search code examples
kotlinkotlin-coroutinesproject-reactorreactor-kafka

How to call a reactor function from a coroutine context?


The scenario is I'm developing a Kotlin microservice that uses coroutine and one of the libraries I'm using (reactor.Kafka) is expecting a flux publisher. How can I bridge the two? (I'm using kotlinx-coroutines-reactive and kotlinx-coroutines-reactor to bridge the other way around when consuming an event I open mono{} block and call a suspending function but in this case its the other way around).

The function I'm trying to call:

kafkaSender.send(Flux.just(SenderRecord.create(record, "0")))

Solution

  • At first glance, we do not see bridging methods in kotlinx-coroutines-reactor to await from a Flux. But:

    Note that Mono and Flux are subclasses of Reactive Streams' Publisher and extensions for it are covered by the kotlinx-coroutines-reactive module.


    I think you can simply call .awaitLast().

    https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/await-last.html