Search code examples
apache-kafkakafka-consumer-apispring-kafka

check consumed partition and consumer for a message in kafka


i have a java spring boot kafka consumer (spring-kafka). What it does is consume from a topic. My topic has 3 partitions and below is the code for the consumer.

@KafkaListener(topics = "myTopic", groupId = "myGroup", autoStartup = "true", concurrency = "3")
  public void consume(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                      @Header("custom_header_1") String customHeader1,
                      @Header("custom_header_2") String customHeader2,
                      @Header("custom_header_3") String customHeader3,
                      @Header(required = false, name = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                      @Payload(required = false) String message) {

    log.info("-------------------------");
    log.info(key);
    log.info(message);
    log.info("-------------------------");

  }

In here i have added concurrency as 3 since my topic has 3 partitions. I several messages with different keys. Actually my key is an id of an entity in my application.

According to my understanding when my consumer starts consuming it will have 3 threads as separate consumers rather than single consumer consuming all the messages (Because i have used the concurrency as 3).

So what i want is which consumer consumer from which partition. Is there a way that i can see the from 3 consumers, which consumer consumed which message ? As an example imagine my partitions are myTopic0, myTopic1 and myTopic2 and i have consumer0, consumer1 and consumer2. Is there a way that i can see messageX consumed from partition myTopic1 by consumer1 programmatically.


Solution

  • Thread.currentThread().getName() will contain the thread name which is the listener id followed by -n where n is 0, 1, 2. Give the listener an id instead of letting the framework generate one.

    (Your log messages will also include it if you have %t in your log pattern).