When we deal with synchronous approach we can receive from kafka a list of messages easily:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
...
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)),
new StringDeserializer(),
new StringDeserializer());
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
@KafkaListener(topics = {"topic"}, containerFactory = "testContainerFactory", groupId = "group-id")
public void receiveMessages(List<String> messages) {
processorService.process(messages); // imagine that processorService works with lists more efficiently than with multiple calls by one message
}
If I understand correctly it is guaranteed that the list will not exceed 500 messages.
I need a similar behavior with a reactive solution for my app. I know that we can use reactor-kafka
:
@Bean
public ApplicationRunner runner() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
...
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(Collections.singletonList("topic"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(message -> processorService.process(List.of(message)))
.subscribe();
};
}
As .receive()
returns a Flux
I need to wrap it to a list to feed processorService.process
with a real list (not with only one value).
Is it ok to collect flux to a list like this in terms of kafka efficiency?
KafkaReceiver.create(ro)
.receive()
.collectToList()
.doOnNext(processorService::process))
.subscribe();
How many items will be in the list in such a case? Will it not exceed 500 as we set ConsumerConfig.MAX_POLL_RECORDS_CONFIG=500 in ReceiverOptions
, right?
.receiveBatch()
was recently added; it is available in 1.3.21.
https://github.com/reactor/reactor-kafka/commit/9597c9388f33b03817c439159a81807ab6300123