I am using spring-kafka 2.2.8 to created a batch consumer and trying to capture the my container metrics to understand the performance details of the batch consumer.
@Bean
public ConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory(consumerConfigs(),stringKeyDeserializer(), avroValueDeserializer());
}
@Bean
public FixedBackOffPolicy getBackOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(100);
return backOffPolicy;
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaBatchListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setStatefulRetry(true);
return factory;
}
public Map<String, Object> consumerConfigs(){
Map<String, Object> configs = new HashMap<>();
batchConsumerConfigProperties.setKeyDeserializerClassConfig();
batchConsumerConfigProperties.setValueDeserializerClassConfig();
batchConsumerConfigProperties.setKeyDeserializerClass(StringDeserializer.class);
batchConsumerConfigProperties.setValueDeserializerClass(KafkaAvroDeserializer.class);
batchConsumerConfigProperties.setSpecificAvroReader("true");
batchConsumerConfigProperties.setAutoOffsetResetConfig(environment.getProperty("sapphire.kes.consumer.auto.offset.reset", "earliest"));
batchConsumerConfigProperties.setEnableAutoCommitConfig(environment.getProperty("sapphire.kes.consumer.enable.auto.commit", "false"));
batchConsumerConfigProperties.setMaxPollIntervalMs(environment.getProperty(MAX_POLL_INTERVAL_MS_CONFIG, "300000"));
batchConsumerConfigProperties.setMaxPollRecords(environment.getProperty(MAX_POLL_RECORDS_CONFIG, "50000"));
batchConsumerConfigProperties.setSessionTimeoutms(environment.getProperty(SESSION_TIMEOUT_MS_CONFIG, "10000"));
batchConsumerConfigProperties.setRequestTimeOut(environment.getProperty(REQUEST_TIMEOUT_MS_CONFIG, "30000"));
batchConsumerConfigProperties.setHeartBeatIntervalMs(environment.getProperty(HEARTBEAT_INTERVAL_MS_CONFIG, "3000"));
batchConsumerConfigProperties.setFetchMinBytes(environment.getProperty(FETCH_MIN_BYTES_CONFIG, "1"));
batchConsumerConfigProperties.setFetchMaxBytes(environment.getProperty(FETCH_MAX_BYTES_CONFIG, "52428800"));
batchConsumerConfigProperties.setFetchMaxWaitMS(environment.getProperty(FETCH_MAX_WAIT_MS_CONFIG, "500"));
batchConsumerConfigProperties.setMaxPartitionFetchBytes(environment.getProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"));
batchConsumerConfigProperties.setConnectionsMaxIdleMs(environment.getProperty(CONNECTIONS_MAX_IDLE_MS_CONFIG, "540000"));
batchConsumerConfigProperties.setAutoCommitIntervalMS(environment.getProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"));
batchConsumerConfigProperties.setReceiveBufferBytes(environment.getProperty(RECEIVE_BUFFER_CONFIG, "65536"));
batchConsumerConfigProperties.setSendBufferBytes(environment.getProperty(SEND_BUFFER_CONFIG, "131072"));
}
Here is my consumer code where I'm trying to capture the container metrics
@Component
public class MyBatchConsumer {
private final KafkaListenerEndpointRegistry registry;
@Autowired
public MyBatchConsumer(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
@KafkaListener(topics = "myTopic", containerFactory = "kafkaBatchListenerContainerFactory", id = "myBatchConsumer")
public void consumeRecords(List<ConsumerRecord> messages) {
System.out.println("messages size - " + messages.size());
if(mybatchconsumerMessageCount == 0){
ConsumerPerfTestingConstants.batchConsumerStartTime = System.currentTimeMillis();
ConsumerPerfTestingConstants.batchConsumerStartDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss"));
}
mybatchconsumerMessageCount = mybatchconsumerMessageCount + messages.size());
System.out.println("\n\n\n batchConsumerConsumedMessages " + mybatchconsumerMessageCount);
if (mybatchconsumerMessageCount == targetMessageCount) {
System.out.println("ATTENTION! ATTENTION! ATTENTION! Consumer Finished processing " + messageCount + " messages");
registry.getListenerContainerIds().forEach(
listenerId -> System.out.println(" kes batch consumer listenerId is "+listenerId)
);
String listenerID = registry.getListenerContainerIds().stream().filter(listenerId -> listenerId.startsWith("myBatchConsumer")).findFirst().get();
System.out.println(" kes batch consumer listenerID is "+listenerID);
Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer(listenerID).metrics();
registry.getListenerContainer(listenerID).stop();
System.out.println("metrics - "+metrics);
}
}
}
Now, I'm trying to consume 10 records and see what are the metrics look like and i see below values and not sure why. Can someone help me understand what am missing here?
records-consumed-total = 0
records-consumed-rate = 0
This works fine for me; I am using 2.6.2, but the container simply delegates to the consumer when calling metrics.
@SpringBootApplication
public class So64878927Application {
public static void main(String[] args) {
SpringApplication.run(So64878927Application.class, args);
}
@Autowired
KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "so64878927", topics = "so64878927")
void listen(List<String> in) {
System.out.println(in);
Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer("so64878927").metrics();
System.out.println("L: " + metrics.get("consumer-so64878927-1").entrySet().stream()
.filter(entry -> entry.getKey().name().startsWith("records-consumed"))
.map(entry -> entry.getValue().metricName().name() + " = " + entry.getValue().metricValue())
.collect(Collectors.toList()));
registry.getListenerContainer("so64878927").stop(() -> System.out.println("Stopped"));
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so64878927").build();
}
@EventListener
void idleEvent(ListenerContainerIdleEvent event) {
Map<String, Map<MetricName, ? extends Metric>> metrics = registry.getListenerContainer("so64878927").metrics();
System.out.println("I: " + metrics.get("consumer-so64878927-1").entrySet().stream()
.filter(entry -> entry.getKey().name().startsWith("records-consumed"))
.map(entry -> entry.getValue().metricName().name() + " = " + entry.getValue().metricValue())
.collect(Collectors.toList()));
}
}
spring.kafka.listener.type=batch
spring.kafka.listener.idle-event-interval=6000
[foo, bar, baz, foo, bar, baz]
L: [records-consumed-total = 6.0, records-consumed-rate = 0.1996472897880411, records-consumed-total = 6.0, records-consumed-rate = 0.1996539331824837]
I am not sure why the metrics are duplicated but, as I said, all we do is call the consumer's metrics method.
By the way, if you want to stop the container from the listener, you should use the async stop - see my example.