Spring Boot environment listening to kafka topics(@KafkaListener / @StreamListener) Configured the listener factory to operate in batch mode:
ConcurrentKafkaListenerContainerFactory # setBatchListener
or via application.properties
:
spring.kafka.listener.type=batch
How to configure the framework so that given two numbers: N and T, it will try to fetch N records for the listener but won't wait more than T seconds, like described here: https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
Some properties I've looked at:
max-poll-records
ensures one won't get more than N records in a batchfetch-min-size
gets at least this amount of data in a fetch requestfetch-max-wait
but don't wait more than necessaryidleBetweenPolls
just sleep a bit between pollsIt seems like fetch-min-size
combined with fetch-max-wait
should do it but they compare bytes, not messages/records.
It is obviously possible to implement that by hand, I'm looking whether it's possible to configure Spring to to that for me.
It seems like
fetch-min-size
combined withfetch-max-wait
should do it but they compare bytes, not messages/records.
That is correct, unfortunately, Kafka provides no mechanism such as fetch.min.records
.
I don't anticipate that Spring would layer this functionality on top of the kafka-clients; it would be better to ask for a new feature in Kafka itself.
Spring does not manipulate the records returned from the poll at all, except you can now specify subBatchPerPartition
to get batches containing just one partition in order to properly support zombie fencing when using exactly once read/prcess/write.