Search code examples
spring-cloud-stream

Kafka reactive binder consumer concurrency option is pointless


Assume it`s either defect or wrong by design... but just want to double check if Im not missing anything here.

Spring version:spring-cloud-stream-binder-kafka-reactive:4.1.0

Spring docs says, in this regards (https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-reactive-binder/concurrency.html): "That will create N dedicated KafkaReceiver objects that generate N separate Flux implementations and then stream them to the handler method." Point is that its has no difference with reactive consuming either you consume multiple partitions with same KafkaReciever or with multiple receivers till its processed by same "handler" flux.

My expectation was that each KafkaReciever will get isolated FLux, which will allow to process messages from different partitions isolated from each other, which allow parallel processing of messages without losing order in each particular partition, which most likely always crucial if you choose kafka. But that is not a case and all receivers publish messages to same flux, which makes consumer concurrency option pointless.

Im building small application which republish message from kafka topic to other kafka topic with payload transformation. So got really straight forward functional bean like so (Ive simplify transformation, but left asynchronius processing as its crucial here):

    @Bean
    public Function<Flux<Message<Map<String, Object>>>, Flux<Message<Map<String, Object>>>> transform() {
        return originalMessageFlux ->
                originalMessageFlux.flatMapSequential(this::doTransformation);
    }

    static ForkJoinPool executor = new ForkJoinPool();

    private Mono<Message<Map<String, Object>>> doTransformation(Message<Map<String, Object>> originalMessage) {
        CompletableFuture<Message<Map<String, Object>>> completableFuture = new CompletableFuture<>();
        executor.submit(()->{
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            completableFuture.complete(originalMessage);
        });
        return Mono.fromFuture(completableFuture);
    }

Please notice that Im using "flatMapSequential" because of that asynchronous transformation. In reality it will have different processing time, so flatMapSequantial guarantee that next event will be processed only after previous done.

So my expectation was that transform() function will be called for each KafkaReciever and as result, Ill get N-parallel processing partitions at same time, which are synchronized by they own "flatMapSequantial"

So far workaround which Im using is to group by partition header and "flatMapSequantial" on each particular group, like so.

 return originalMessageFlux ->
                originalMessageFlux
                        .groupBy(mapMessage -> (Integer) mapMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION))
                        .flatMap(groupedFlux -> groupedFlux.flatMapSequential(originalMessage -> {...}

Solution

  • The statement

    That will create N dedicated KafkaReceiver objects that generate N separate Flux implementations and then stream them to the handler method.

    is correct. The reactive Kafka binder creates as many KafkaReceiver objects as specified by the concurrency property. However, currently, this only gives the semantics of faster/concurrent processing via multiple receivers. When the records are given to the handler method, you get a merged representation of records from multiple receivers. I believe this is the issue you are running into. The workaround is what you showed above by filtering on the KafkaHeaders.RECEIVED_PARTITION header. This is exactly what this test is doing here.

    If you can create an issue in the repository and point to this SO thread, we will be happy to explore this scenario to add further semantic options for the concurrency property in the reactive Kafka binder.