Search code examples
spring-bootapache-kafkakafka-consumer-apiapache-kafka-streams

How to limit Message consumption rate of Kafka Consumer in SpringBoot? (Kafka Stream)


I want to limit my Kafka Consumer message consumption rate to 1 Message per 10 seconds .I'm using kafka streams in Spring boot .

Following is the property I tried to Make this work but it didn't worked out s expected(Consumed many messages at once).

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
         //
         config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
         config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);

is there any way to Manually ACK(Manual offsetCommits) in KafkaStreams? which will be usefull to control the msg consumption rate .

Please note that i'm using Kstreams(KafkaStreams) Any help is really appreciated . :)


Solution

  • Finally, I achieved the desired message consumption limit using Thread.sleep(). Since there is no way to control the message consumption rate using Kafka config properties, I had to use my application code to control the rate of consumption.

    Example: If I want to set the record consumption rate to say, 4 msg per 10 seconds then I will consume 4 messages (keeping a count in parallel). Once 4 records are consumed, then I will make the thread sleep for 10 seconds and will repeat the same process over again.