Search code examples
spring-bootspring-kafkaspring-boot-actuatormicrometerspring-micrometer

Meter registration fails on Spring Boot Kafka consumer with Prometheus MeterRegistry


I am investigating a bug report in our application (spring boot) regarding the kafka metric kafka.consumer.fetch.manager.records.consumed.total being missing.

The application has two kafka consumers, lets call them query-routing and query-tracking consumers, and they are configured via @KafkaListener annotation and each kafka consumer has it's own instance of ConcurrentKafkaListenerContainerFactory.

The query-router consumer is configured as

@Configuration
@EnableKafka
public class QueryRoutingConfiguration {
    
    @Bean(name = "queryRoutingContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, RoutingInfo> kafkaListenerContainerFactory(MeterRegistry meterRegistry) {
        
        Map<String, Object> consumerConfigs = new HashMap<>();
        // For brevity I removed the configs as they are trivial configs like bootstrap servers and serializers

        DefaultKafkaConsumerFactory<String, RoutingInfo> consumerFactory =
            new DefaultKafkaConsumerFactory<>(consumerConfigs);
        
        consumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

        ConcurrentKafkaListenerContainerFactory<String, RoutingInfo> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setIdleEventInterval(5000L);

        return factory;
    }
}

And the query-tracking consumer is configured as:

@Configuration
@EnableKafka
public class QueryTrackingConfiguration {

    private static final FixedBackOff NO_ATTEMPTS = new FixedBackOff(Duration.ofSeconds(0).toMillis(), 0L);
    

    @Bean(name = "queryTrackingContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, QueryTrackingMessage> kafkaListenerContainerFactory(MeterRegistry meterRegistry) {

        Map<String, Object> consumerConfigs = new HashMap<>();
        // For brevity I removed the configs as they are trivial configs like bootstrap servers and serializers
        DefaultKafkaConsumerFactory<String, QueryTrackingMessage> consumerFactory =
            new DefaultKafkaConsumerFactory<>(consumerConfigs);

        consumerFactory.addListener(new MicrometerConsumerListener<>(meterRegistry));

        ConcurrentKafkaListenerContainerFactory<String, QueryTrackingMessage> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);

        DefaultErrorHandler deusErrorHandler = new DefaultErrorHandler(NO_ATTEMPTS);
        factory.setCommonErrorHandler(deusErrorHandler);

        return factory;
    }
}

The MeterRegistryConfigurator bean configuaration is set as:

@Configuration
public class MeterRegistryConfigurator {
    private static final Logger LOG = LoggerFactory.getLogger(MeterRegistryConfigurator.class);
    private static final String PREFIX = "dps";

    @Bean
    MeterRegistryCustomizer<MeterRegistry> meterRegistryCustomizer() {
        return registry -> registry.config()
            .onMeterAdded(meter -> LOG.info("onMeterAdded: {}", meter.getId().getName()))
            .onMeterRemoved(meter -> LOG.info("onMeterRemoved: {}", meter.getId().getName()))
            .onMeterRegistrationFailed(
                (id, s) -> LOG.info("onMeterRegistrationFailed - id '{}' value '{}'", id.getName(), s))
            .meterFilter(PrefixMetricFilter.withPrefix(PREFIX))
            .meterFilter(
                MeterFilter.deny(id ->
                    id.getName().startsWith(PREFIX + ".jvm")
                        || id.getName().startsWith(PREFIX + ".system")
                        || id.getName().startsWith(PREFIX + ".process")
                        || id.getName().startsWith(PREFIX + ".logback")
                        || id.getName().startsWith(PREFIX + ".tomcat"))
            )
            .meterFilter(MeterFilter.ignoreTags("host", "host.name"))
            .namingConvention(NamingConvention.snakeCase);
    }
}

The @KafkaListener for each consumer is set as

@KafkaListener(
    id = "query-routing",
    idIsGroup = true,
    topics = "${query-routing.consumer.topic}",
    groupId = "${query-routing.consumer.groupId}",
    containerFactory = "queryRoutingContainerFactory")
public void listenForMessages(ConsumerRecord<String, RoutingInfo> record) {
    // Handle each record ...
}

and

@KafkaListener(
    id = "query-tracking",
    idIsGroup = true,
    topics = "${query-tracking.consumer.topic}",
    groupId = "${query-tracking.consumer.groupId}",
    containerFactory = "queryTrackingContainerFactory"
)
public void listenForMessages(List<ConsumerRecord<String, QueryTrackingMessage>> consumerRecords, Acknowledgment ack) {
    // Handle each record ...
}

When the application starts up, going to the actuator/prometheus endpoing I can see the metric for both consumers:

# HELP dps_kafka_consumer_fetch_manager_records_consumed_total The total number of records consumed
# TYPE dps_kafka_consumer_fetch_manager_records_consumed_total counter
dps_kafka_consumer_fetch_manager_records_consumed_total{client_id="consumer-qf-query-tracking-consumer-1",kafka_version="3.1.2",spring_id="not.managed.by.Spring.consumer-qf-query-tracking-consumer-1",} 7.0
dps_kafka_consumer_fetch_manager_records_consumed_total{client_id="consumer-QF-Routing-f5d0d9f1-e261-407b-954d-5d217211dee0-2",kafka_version="3.1.2",spring_id="not.managed.by.Spring.consumer-QF-Routing-f5d0d9f1-e261-407b-954d-5d217211dee0-2",} 0.0

But a few seconds later there is a new call to io.micrometer.core.instrument.binder.kafka.KafkaMetrics#checkAndBindMetrics which will remove a set of metrics (including kafka.consumer.fetch.manager.records.consumed.total)

onMeterRegistrationFailed - dps.kafka.consumer.fetch.manager.records.consumed.total string Prometheus requires that all meters with the same name have the same set of tag keys. There is already an existing meter named 'dps.kafka.consumer.fetch.manager.records.consumed.total' containing tag keys [client_id, kafka_version, spring_id]. The meter you are attempting to register has keys [client_id, kafka_version, spring_id, topic].

Going again to actuator/prometheus will only show the metric for the query-routing consumer:

# HELP deus_dps_persistence_kafka_consumer_fetch_manager_records_consumed_total The total number of records consumed for a topic
# TYPE deus_dps_persistence_kafka_consumer_fetch_manager_records_consumed_total counter
deus_dps_persistence_kafka_consumer_fetch_manager_records_consumed_total{client_id="consumer-QF-Routing-0a739a21-4764-411a-9cc6-0e60293b40b4-2",kafka_version="3.1.2",spring_id="not.managed.by.Spring.consumer-QF-Routing-0a739a21-4764-411a-9cc6-0e60293b40b4-2",theKey="routing",topic="QF_query_routing_v1",} 0.0

As you can see above the metric for the query-tracking consumer is gone. As the log says, The meter you are attempting to register has keys [client_id, kafka_version, spring_id, topic]. The issue is I cannot find where is this metric with a topic key being registered which will trigger io.micrometer.core.instrument.binder.kafka.KafkaMetrics#checkAndBindMetrics which will remove the metric for the query-tracking consumer.

I am using

  • micrometer-registry-prometheus version 1.9.5
  • spring boot version 2.7.5
  • spring kafka (org.springframework.kafka:spring-kafka)

My question is, why does the metric kafka.consumer.fetch.manager.records.consumed.total fails causing it to be removed for the query-tracking consumer and how can I fix it?


Solution

  • I believe this is internal in Micrometer KafkaMetrics.

    Periodically, it checks for new metrics; presumably, the topic one shows up after the consumer subscribes to the topic.

    @Override
    public void bindTo(MeterRegistry registry) {
        this.registry = registry;
    
        commonTags = getCommonTags(registry);
        prepareToBindMetrics(registry);
        checkAndBindMetrics(registry);
    
        VVVVVVVVVVVVVVVVVVVVVVVVVVVVVV
    
        scheduler.scheduleAtFixedRate(() -> checkAndBindMetrics(registry), getRefreshIntervalInMillis(),
                getRefreshIntervalInMillis(), TimeUnit.MILLISECONDS);
    }
    

    You should be able to write a filter to exclude the one with fewer tags.