Search code examples
spring-kafkaspring-cloud-stream-binder-kafka

Spring Kafka batch within time window


Spring Boot environment listening to kafka topics(@KafkaListener / @StreamListener) Configured the listener factory to operate in batch mode:

ConcurrentKafkaListenerContainerFactory # setBatchListener

or via application.properties:

spring.kafka.listener.type=batch

How to configure the framework so that given two numbers: N and T, it will try to fetch N records for the listener but won't wait more than T seconds, like described here: https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
Some properties I've looked at:

  • max-poll-records ensures one won't get more than N records in a batch
  • fetch-min-size gets at least this amount of data in a fetch request
  • fetch-max-wait but don't wait more than necessary
  • idleBetweenPolls just sleep a bit between polls

It seems like fetch-min-size combined with fetch-max-wait should do it but they compare bytes, not messages/records.

It is obviously possible to implement that by hand, I'm looking whether it's possible to configure Spring to to that for me.


Solution

  • It seems like fetch-min-size combined with fetch-max-wait should do it but they compare bytes, not messages/records.

    That is correct, unfortunately, Kafka provides no mechanism such as fetch.min.records.

    I don't anticipate that Spring would layer this functionality on top of the kafka-clients; it would be better to ask for a new feature in Kafka itself.

    Spring does not manipulate the records returned from the poll at all, except you can now specify subBatchPerPartition to get batches containing just one partition in order to properly support zombie fencing when using exactly once read/prcess/write.