I am using spring-kafka 2.2.8 to create a batch consumer and getting an exception when i deploy my consumer.
java.lang.ClassCastException: org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter cannot be cast to org.springframework.kafka.listener.MessageListener
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:455) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:433) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:310) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:62) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:200) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:172) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:146) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:164) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:158) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:263) ~[spring-kafka-2.2.7.RELEASE.jar!/:2.2.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:862) ~[spring-beans-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877) ~[spring-context-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.9.RELEASE.jar!/:5.1.9.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:743) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:390) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.SpringApplication.run(SpringApplication.java:1214) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.SpringApplication.run(SpringApplication.java:1203) ~[spring-boot-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at com.abc.xyz.yyy.kafka.integrationtestapi.Application.main(Application.java:10) ~[classes/:?]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_202]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_202]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_202]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_202]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) ~[app/:?]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) ~[app/:?]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.loader.Launcher.launch(Launcher.java:51) ~[app/:?]
2020-11-11T13:38:31.97-0500 [APP/PROC/WEB/0] OUT at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:52) ~[app/:?]
Here is my consumer config
@Bean
public ConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory(consumerConfigs(),stringKeyDeserializer(), avroValueDeserializer());
}
@Bean
public RetryPolicy getRetryPolicy(){
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(3);
return simpleRetryPolicy;
}
@Bean
public FixedBackOffPolicy getBackOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(100);
return backOffPolicy;
}
@Bean
public RetryTemplate getRetryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getRetryPolicy());
retryTemplate.setBackOffPolicy(getBackOffPolicy());
return retryTemplate;
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setStatefulRetry(true);
factory.setRetryTemplate(getRetryTemplate());
return factory;
}
Now my question is,
RetryTemplate
is not supported for a batch listener (we don't know which record in the batch failed).
It's best to use a RetryTemplate
directly in the listener for that case.
Like this...
@KafkaListener(...)
void listen(List<Foo> in) {
in.forEach(foo -> {
this.retryTemplate.execute(context -> {
// process the foo
}, context -> {
// retries exhausted - save somewhere and move on to the next
});
});
}
Starting with version 2.3.7, you can configure a RetryingBatchErrorHandler
and the whole batch will be retried.
Starting with version 2.5, you can configure a RecoveringBatchErrorHandler
where you can throw a special exception to the handler which record in the batch failed so only unprocessed records will be redelivered.