Search code examples
javaspring-webfluxspring-kafkaproject-reactorreactor-kafka

How to get a list of messages with reactor-kafka receiver?


When we deal with synchronous approach we can receive from kafka a list of messages easily:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(Map.of(
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
    ...
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)),
            new StringDeserializer(),
            new StringDeserializer());
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    return factory;
}

@KafkaListener(topics = {"topic"}, containerFactory = "testContainerFactory", groupId = "group-id")
public void receiveMessages(List<String> messages) {
    processorService.process(messages); // imagine that processorService  works with lists more efficiently than with multiple calls by one message
}

If I understand correctly it is guaranteed that the list will not exceed 500 messages.

I need a similar behavior with a reactive solution for my app. I know that we can use reactor-kafka:

@Bean
public ApplicationRunner runner() {
    return args -> {
        ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create(
                Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                       ...
                       ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500))
        .withKeyDeserializer(new StringDeserializer())
        .withValueDeserializer(new StringDeserializer())
        .subscription(Collections.singletonList("topic"));
        KafkaReceiver.create(ro)
                .receive()
                .doOnNext(message -> processorService.process(List.of(message)))
                .subscribe();
    };
}

As .receive() returns a Flux I need to wrap it to a list to feed processorService.process with a real list (not with only one value). Is it ok to collect flux to a list like this in terms of kafka efficiency?

        KafkaReceiver.create(ro)
                .receive()
                .collectToList()
                .doOnNext(processorService::process))
                .subscribe();

How many items will be in the list in such a case? Will it not exceed 500 as we set ConsumerConfig.MAX_POLL_RECORDS_CONFIG=500 in ReceiverOptions, right?


Solution

  • .receiveBatch() was recently added; it is available in 1.3.21.

    https://github.com/reactor/reactor-kafka/commit/9597c9388f33b03817c439159a81807ab6300123