I am using spring kafka 2.X and now want to migrate to 3.0
Currently:
I have a listener container factory setup for retry functionality only, where I am setting up recovery like below:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> retryConcurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Configure the RetryTemplate
RetryTemplate retryTemplate = new RetryTemplate();
// Configure your retry policy, backoff policy, and any other necessary options
// retryTemplate.setRetryPolicy(...);
// retryTemplate.setBackOffPolicy(...);
RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
interceptor.setRetryOperations(retryTemplate);
factory.setRetryTemplate(retryTemplate);
RecoveryCallback<Object> recoveryCallback = context -> {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
System.out.println("Recovery callback invoked for record: " + record.value());
Acknowledgment acknowledgment = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
System.out.println("Recovery callback invoked for record: " + record.value());
if (acknowledgment != null) {
// Manually acknowledge the record
acknowledgment.acknowledge();
}
return null;
};
factory.setRecoveryCallback(recoveryCallback);
// Other configuration options for the factory
return factory;
}
Now using kafka spring version 3.0 how can I achieve the same thing ?
I refered to official documentation but it couldnt help.
See a DefaultErrorHandler
with its BackOff
and ConsumerRecordRecoverer
options: https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#default-eh.
So, instead of that factory.setRetryTemplate(retryTemplate);
and factory.setRecoveryCallback(recoveryCallback);
you have to use factory.setCommonErrorHandler(new DefaultErrorHandler(...));