Search code examples
spring-bootspring-kafkaspring-cloud-stream

Spring Kafka Idle Between Polls


I've had a business requirement to slow down the consumption rate of a kafka consumer due to the impact it could possibly cause to other microservices. After a bit of research I've come to a solution being: max-poll-records to limit the number of records being consumed from each poll and setting idleBetweenPolls to regulate the interval at which each poll should be executed. Being mindful to not exceed the time stipulated in max-poll-interval-ms.

So I was able to set max-poll-records as a property and idleBetweenPolls as a container property like this:

factory.getContainerProperties().setIdleBetweenPolls(3000)

And it worked perfectly, but I've also came across the property spring.kafka.listener.idle-between-polls, and reading the property description it says: Sleep interval between Consumer. poll(Duration) calls. Except setting this property didn't work as expected, not sure if I'm missing some configuration or perhaps creating the ConcurrentKafkaListenerContainerFactory as a bean and setting a few of its properties is overriding the listener property?

Can someone help and elaborate on this?


Solution

  • The spring.kafka.listener.idle-between-polls property is mapped like this in Spring Boot:

    map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls);
    

    That is indeed done for the ConcurrentKafkaListenerContainerFactory, but only if:

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
    

    So, if you have your own kafkaListenerContainerFactory bean, then Spring Boot auto-configuration backs off and everything is in your hands. You can inject that KafkaProperties into your custom bean, though, but that's already a different story.

    For your use-case you might also consider to use a pause functionality: https://docs.spring.io/spring-kafka/reference/kafka/pause-resume.html