Search code examples
javaspringspring-bootspring-kafkaconcurrentlinkedqueue

Limit number of messages from Kafka in Spring Batch


I am storing the data from KafkaListener in a ConcurrentLinkedQueue to be processed. Currently it consumes as many data as it can and completely fills up RAM. How do I limit the number of messages in the queue so that when it reaches the limit the KafkaListener pauses.

ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();

@KafkaListener(
        topics = "topic",
        id = "topic-kafka-listener",
        groupId = "batch-processor",
        containerFactory = "kafkaListenerContainerFactory"
)
public void receive(@NotNull @Payload List<Message> messages) {
    queue.addAll(messages);
}

How do I limit the queue size to say 1 million?
Whenever the queue is polled and there is free space it should start listening again.

OR

How do I limit the rate at which Kafka consumes messages to say 100,000 messages per second?


Solution

  • Instead of using annotation I used the KafkaConsumer object to poll for data manually. With this there is more control.

    Map<String, Object> consumerConfig = Map.of(
            "bootstrap. Servers", "localhost:9092",
            "key.deserializer", StringDeserializer.class,
            "value.deserializer", StringDeserializer.class,
            "group.id", "batch-processor",
            "max.poll.records", 480000
    );
    KafkaConsumer<String, Message> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
    
    kafkaConsumer.subscribe(List.of("topic"));
    
    public void receive()    {
        ConsumerRecords<String, Message> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
        consumerRecords.forEach(record -> queue. Add(record. Value()));
    }