Search code examples
spring-webfluxspring-cloud-streamreactorspring-cloud-stream-binder-kafka

How to increase the topic consumer throughput by using `Function<Flux<String>, Flux<String>>`?


I have a service based on webFlux and will consume then produce message from a kafka topic. My code is just like this

@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
    return flux -> flux.map(val -> val.toUpperCase());
}

What I found is when I have 2 instance, I could consume 750 message per 30 minutes, but my CPU is never higher than 10%. As time goes by, the lag keeps increasing, so I'm wondering how could I increase the consumer throughput. From the documents, the concurrency doesn't take effect for reactive, link

Does any one know how could I increase the throughput without adding more instance?


Solution

  • As I'm using kotlin, what I found is using Flow.flatMapMerge(parallelCount)