Search code examples
apache-kafkakafka-consumer-apispring-kafka

Spring Kafka MessageListenerContainer


I am seeing spring Kafka code and I have some doubts:

  1. If we are using 1 @kafkaListener with 2 topics then spring Kafka creates a single MessageListenerContainer. And if I use separate @kafkaListener for each topic then 2 MessageListenerContainer will be created.

  2. Does MessageListenerContainer mean consumer?

  3. If I give concurrency as 4 in ConcurrentKafkaListenerContainerFactory then that means for every kafkaListener I open 4 threads with broker? That means coordinater sees them as 4 different consumer.

  4. How polling works with kafkaListener? Does it get only 1 ConsumerRecord from broker every time?

Please help.


Solution

  • There are two implementations of MessageListenerContainer - the KafkaMessageListenerContainer (KMLC) and ConcurrentMessageListenerContainer (CMLC).

    The CMLC is simply a wrapper for one or more KMLCs, with the number of KMLCs specified by the concurrency.

    @KafkaListener always uses a CMLC.

    Each KMLC gets one Consumer (and one thread). The thread continually poll()s the consumer, with the specified pollTimeout.

    How the topics/partitions are distributed across the KMLCs depends on

    • how many partitions the topic(s) have
    • the consumer's partition.assignment.strategy property

    If you have multiple topics with fewer partitions than the concurrency, you will likely need an alternate partition assignor, such as the round robin assignor, otherwise you will have idle containers with no assignment.

    1. That is correct; if you explicitly want a different container for each topic, you can provide multiple @KafkaListener annotations on the same method.
    2. See my explanation above.
    3. That is correct - it's the only way to get concurrency with Kafka (without adding very complicated logic to manage offsets).
    4. The number of records returned by each poll depends on a number of consumer properties, max.poll.records, fetch.min.bytes, fetch.max.wait.ms.