Search code examples
javaapache-kafkaspring-kafkaobservability

Spring Boot Kafka Micrometer observability doesn't work with concurrency enabled


I configured my application to load beans based on properties in this way:

        @Bean
        @ConditionalOnProperty(
            prefix = "fleexi.kafka.override-concurrency",
            value = {"enabled"}
        )
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(ConsumerFactory<String, String> kafkaConsumerFactory, FleexiKafkaProperties fleexiKafkaProperties) {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConcurrency(fleexiKafkaProperties.getOverrideConcurrency().getListenerConcurrency());
            factory.getContainerProperties().setObservationEnabled(true);
            factory.setConsumerFactory(kafkaConsumerFactory);
            return factory;
        }

        @Bean
        @ConditionalOnMissingBean({ConcurrentKafkaListenerContainerFactory.class})
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> kafkaConsumerFactory) {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
            factory.getContainerProperties().setObservationEnabled(true);
            factory.setConsumerFactory(kafkaConsumerFactory);
            return factory;
        }

In the first case, with concurrency enabled observabilities log traceId and spanId doesn't work.

enter image description here

In second case, without concurrency it works properly enter image description here

Concurrency breaks observability?

EDIT: The example here: https://github.com/claudiomerli/demo-issue-spring-kafka-3459


Solution

  • Your problem is here:

     ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(
    

    where the default containerFactory for the @KafkaListener is indeed that kafkaListenerContainerFactory.

    So, since your custom kafkaListenerContainerFactoryWithConcurrency is out of use, that is not a surprise that traces are not logged.

    Try just this configuration properties instead of those beans:

    spring.kafka.consumer.group-id=TEST
    spring.kafka.listener.concurrency=10
    spring.kafka.listener.observation-enabled=true
    

    UPDATE

    Since you cannot rely on the auto-configuration and have to use those conditional beans you can trick it like this:

        @Bean("kafkaListenerContainerFactory")
        @ConditionalOnProperty(
            prefix = "fleexi.kafka.override-concurrency",
            value = {"enabled"}
        )
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(ConsumerFactory<String, String> kafkaConsumerFactory, FleexiKafkaProperties fleexiKafkaProperties) {
    

    With that default factory bean name expected by the @KafkaListener.