Search code examples
javaspringspring-bootapache-kafkaspring-kafka

How to get consumer-id using a @KafkaListener spring boot 2 consumer


Consumers listed using bin/kafka-consumer-groups.sh utility

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer

Is it possible to get consumer-id information like in above command output in a Spring boot @KafkaListener consumer ?
I want to add this the consumer-id to a table to represent a processor that processed data.
I have gone through @gary-russell's answer on How to get kafka consumer-id for logging, But I dont see consumer-id appearing in partitions assigned logs.

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-7, test_topic-6, test_topic-5, test_topic-4]
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-3, test_topic-2, test_topic-1, test_topic-0]

I am using spring boot 2.2.2
dependencies: spring-kafka


Solution

  • I don't think the consumer-id is available on the client; you can get the client-ids from the metrics:

    @SpringBootApplication
    public class So61616543Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So61616543Application.class, args).close();
        }
    
    
        @KafkaListener(id = "so61616543", topics = "so61616543")
        public void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so61616543").partitions(1).replicas(1).build();
        }
    
    
        @Bean
        public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
            return args -> {
                registry.getListenerContainer("so61616543").metrics()
                    .forEach((clientId, metrics) -> {
                        System.out.println(clientId);
                        metrics.forEach((key, value) -> System.out.println(key + ":" + value));
                    });
            };
        }
    
    }