Search code examples
spring-kafkaapache-kafka-streams

Adding custom static tags to Spring Kafka consumer metrics


My applicaiton consums same topic name from 2 different clusters The only metric tag appearing is "spring.id", needed to add more tags - for example cluster name

tried to implement this https://docs.spring.io/spring-kafka/docs/current/reference/html/#micrometer:~:text=4.1.12.%20Monitoring-,Monitoring%20Listener%20Performance,-Starting%20with%20version

but could not find the way


Solution

  • See the documentation https://docs.spring.io/spring-kafka/docs/current/reference/html/#micrometerTags you can add static tags using the micrometerTags properties and dynamic tags using the micrometerTagsProvider properties.

    EDIT

    @SpringBootApplication
    public class So77408974Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So77408974Application.class, args);
        }
    
        @KafkaListener(id = "so77408974", topics = "so77408974")
        void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        NewTopic topic() {
            return TopicBuilder.name("so77408974").partitions(1).replicas(1).build();
        }
    
        @Bean
        ApplicationRunner runner(KafkaTemplate<String, String> template, ClusterTag tag) {
            template.setMicrometerTagsProvider(rec -> tag.getClusterTag());
            return args -> {
                template.send("so77408974", "foo");
            };
        }
    
        @Bean
        ContainerCustomizer<String, String, ConcurrentMessageListenerContainer<String, String>> containerCustomizer(
                ConcurrentKafkaListenerContainerFactory<String, String> factory, ClusterTag tag) {
    
            ContainerCustomizer<String, String, ConcurrentMessageListenerContainer<String, String>> cust = container -> {
                container.getContainerProperties().setMicrometerTagsProvider(rec -> tag.getClusterTag());
            };
            factory.setContainerCustomizer(cust);
            return cust;
        }
    
    }
    
    @Component
    class ClusterTag {
    
        private final KafkaAdmin admin;
    
        private Map<String, String> clusterTag;
    
        public ClusterTag(KafkaAdmin admin) {
            this.admin = admin;
        }
    
        public Map<String, String> getClusterTag() {
            if (this.clusterTag == null) {
                this.clusterTag = Map.of("cluster.id", this.admin.clusterId());
            }
            return this.clusterTag;
        }
    
    }
    

    Note: when using versions less than 3.0, you need to use AdminClient.describeCluster() to get the cluster id; the KafkaAdmin method was added in 3.0.