My applicaiton consums same topic name from 2 different clusters The only metric tag appearing is "spring.id", needed to add more tags - for example cluster name
tried to implement this https://docs.spring.io/spring-kafka/docs/current/reference/html/#micrometer:~:text=4.1.12.%20Monitoring-,Monitoring%20Listener%20Performance,-Starting%20with%20version
but could not find the way
See the documentation https://docs.spring.io/spring-kafka/docs/current/reference/html/#micrometerTags you can add static tags using the micrometerTags
properties and dynamic tags using the micrometerTagsProvider
properties.
EDIT
@SpringBootApplication
public class So77408974Application {
public static void main(String[] args) {
SpringApplication.run(So77408974Application.class, args);
}
@KafkaListener(id = "so77408974", topics = "so77408974")
void listen(String in) {
System.out.println(in);
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so77408974").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template, ClusterTag tag) {
template.setMicrometerTagsProvider(rec -> tag.getClusterTag());
return args -> {
template.send("so77408974", "foo");
};
}
@Bean
ContainerCustomizer<String, String, ConcurrentMessageListenerContainer<String, String>> containerCustomizer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, ClusterTag tag) {
ContainerCustomizer<String, String, ConcurrentMessageListenerContainer<String, String>> cust = container -> {
container.getContainerProperties().setMicrometerTagsProvider(rec -> tag.getClusterTag());
};
factory.setContainerCustomizer(cust);
return cust;
}
}
@Component
class ClusterTag {
private final KafkaAdmin admin;
private Map<String, String> clusterTag;
public ClusterTag(KafkaAdmin admin) {
this.admin = admin;
}
public Map<String, String> getClusterTag() {
if (this.clusterTag == null) {
this.clusterTag = Map.of("cluster.id", this.admin.clusterId());
}
return this.clusterTag;
}
}
Note: when using versions less than 3.0, you need to use AdminClient.describeCluster()
to get the cluster id; the KafkaAdmin
method was added in 3.0.