Search code examples
spring-kafka

metrics method of MessageListenerContainer is not capturing the right values


I am using spring-kafka 2.2.8 to created a batch consumer and trying to capture the my container metrics to understand the performance details of the batch consumer.

@Bean
public ConsumerFactory consumerFactory(){
    return new DefaultKafkaConsumerFactory(consumerConfigs(),stringKeyDeserializer(), avroValueDeserializer());
}


@Bean
public FixedBackOffPolicy getBackOffPolicy() {
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(100);
    return backOffPolicy;
}
   
   
   

@Bean
    public ConcurrentKafkaListenerContainerFactory kafkaBatchListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setStatefulRetry(true);
        
        return factory;
    }


public Map<String, Object> consumerConfigs(){

        Map<String, Object> configs = new HashMap<>();

        batchConsumerConfigProperties.setKeyDeserializerClassConfig();
        batchConsumerConfigProperties.setValueDeserializerClassConfig();
        batchConsumerConfigProperties.setKeyDeserializerClass(StringDeserializer.class);
        batchConsumerConfigProperties.setValueDeserializerClass(KafkaAvroDeserializer.class);
        batchConsumerConfigProperties.setSpecificAvroReader("true");

        batchConsumerConfigProperties.setAutoOffsetResetConfig(environment.getProperty("sapphire.kes.consumer.auto.offset.reset", "earliest"));
        batchConsumerConfigProperties.setEnableAutoCommitConfig(environment.getProperty("sapphire.kes.consumer.enable.auto.commit", "false"));

        batchConsumerConfigProperties.setMaxPollIntervalMs(environment.getProperty(MAX_POLL_INTERVAL_MS_CONFIG, "300000"));
        batchConsumerConfigProperties.setMaxPollRecords(environment.getProperty(MAX_POLL_RECORDS_CONFIG, "50000"));
        batchConsumerConfigProperties.setSessionTimeoutms(environment.getProperty(SESSION_TIMEOUT_MS_CONFIG, "10000"));
        batchConsumerConfigProperties.setRequestTimeOut(environment.getProperty(REQUEST_TIMEOUT_MS_CONFIG, "30000"));
        batchConsumerConfigProperties.setHeartBeatIntervalMs(environment.getProperty(HEARTBEAT_INTERVAL_MS_CONFIG, "3000"));
        batchConsumerConfigProperties.setFetchMinBytes(environment.getProperty(FETCH_MIN_BYTES_CONFIG, "1"));
        batchConsumerConfigProperties.setFetchMaxBytes(environment.getProperty(FETCH_MAX_BYTES_CONFIG, "52428800"));
        batchConsumerConfigProperties.setFetchMaxWaitMS(environment.getProperty(FETCH_MAX_WAIT_MS_CONFIG, "500"));
        batchConsumerConfigProperties.setMaxPartitionFetchBytes(environment.getProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"));
        batchConsumerConfigProperties.setConnectionsMaxIdleMs(environment.getProperty(CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000"));
        batchConsumerConfigProperties.setAutoCommitIntervalMS(environment.getProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"));
        batchConsumerConfigProperties.setReceiveBufferBytes(environment.getProperty(RECEIVE_BUFFER_CONFIG, "65536"));
        batchConsumerConfigProperties.setSendBufferBytes(environment.getProperty(SEND_BUFFER_CONFIG, "131072"));
}

Here is my consumer code where I'm trying to capture the container metrics

@Component
public class MyBatchConsumer {

    private final KafkaListenerEndpointRegistry registry;
    
    @Autowired
     public MyBatchConsumer(KafkaListenerEndpointRegistry registry) {
            this.registry = registry;
    }

@KafkaListener(topics = "myTopic", containerFactory = "kafkaBatchListenerContainerFactory", id = "myBatchConsumer")
    public void consumeRecords(List<ConsumerRecord> messages) {

        System.out.println("messages size - " + messages.size());

        if(mybatchconsumerMessageCount == 0){
            ConsumerPerfTestingConstants.batchConsumerStartTime = System.currentTimeMillis();
            ConsumerPerfTestingConstants.batchConsumerStartDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss"));
        }

        mybatchconsumerMessageCount = mybatchconsumerMessageCount + messages.size());
        System.out.println("\n\n\n batchConsumerConsumedMessages " + mybatchconsumerMessageCount);

        if (mybatchconsumerMessageCount == targetMessageCount) {
            System.out.println("ATTENTION! ATTENTION! ATTENTION! Consumer Finished processing " + messageCount + " messages");


            registry.getListenerContainerIds().forEach(
                    listenerId -> System.out.println(" kes batch consumer listenerId is "+listenerId)
            );

            String listenerID = registry.getListenerContainerIds().stream().filter(listenerId -> listenerId.startsWith("myBatchConsumer")).findFirst().get();

            System.out.println(" kes batch consumer listenerID is "+listenerID);

            Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer(listenerID).metrics();
            registry.getListenerContainer(listenerID).stop();
            
            System.out.println("metrics - "+metrics);
        }
    }
}

Now, I'm trying to consume 10 records and see what are the metrics look like and i see below values and not sure why. Can someone help me understand what am missing here?

records-consumed-total = 0
records-consumed-rate = 0

Solution

  • This works fine for me; I am using 2.6.2, but the container simply delegates to the consumer when calling metrics.

    @SpringBootApplication
    public class So64878927Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So64878927Application.class, args);
        }
    
        @Autowired
        KafkaListenerEndpointRegistry registry;
    
        @KafkaListener(id = "so64878927", topics = "so64878927")
        void listen(List<String> in) {
            System.out.println(in);
            Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer("so64878927").metrics();
            System.out.println("L: " + metrics.get("consumer-so64878927-1").entrySet().stream()
                    .filter(entry -> entry.getKey().name().startsWith("records-consumed"))
                    .map(entry -> entry.getValue().metricName().name() + " = " + entry.getValue().metricValue())
                    .collect(Collectors.toList()));
    
            registry.getListenerContainer("so64878927").stop(() -> System.out.println("Stopped"));
        }
    
        @Bean
        NewTopic topic() {
            return TopicBuilder.name("so64878927").build();
        }
    
        @EventListener
        void idleEvent(ListenerContainerIdleEvent event) {
            Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer("so64878927").metrics();
            System.out.println("I: " + metrics.get("consumer-so64878927-1").entrySet().stream()
                    .filter(entry -> entry.getKey().name().startsWith("records-consumed"))
                    .map(entry -> entry.getValue().metricName().name() + " = " + entry.getValue().metricValue())
                    .collect(Collectors.toList()));
        }
    
    }
    
    spring.kafka.listener.type=batch
    spring.kafka.listener.idle-event-interval=6000
    
    [foo, bar, baz, foo, bar, baz]
    L: [records-consumed-total = 6.0, records-consumed-rate = 0.1996472897880411, records-consumed-total = 6.0, records-consumed-rate = 0.1996539331824837]
    

    I am not sure why the metrics are duplicated but, as I said, all we do is call the consumer's metrics method.

    By the way, if you want to stop the container from the listener, you should use the async stop - see my example.