Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafka

Spring Kafka RecoveryCallback migration


I am using spring kafka 2.X and now want to migrate to 3.0

Currently:

I have a listener container factory setup for retry functionality only, where I am setting up recovery like below:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> retryConcurrentKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    // Configure the RetryTemplate
    RetryTemplate retryTemplate = new RetryTemplate();
    // Configure your retry policy, backoff policy, and any other necessary options
    // retryTemplate.setRetryPolicy(...);
    // retryTemplate.setBackOffPolicy(...);

  
    RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
    interceptor.setRetryOperations(retryTemplate);
    factory.setRetryTemplate(retryTemplate);

    
    RecoveryCallback<Object> recoveryCallback = context -> {
        

        ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>)        context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
        System.out.println("Recovery callback invoked for record: " + record.value());


     


    Acknowledgment acknowledgment = (Acknowledgment)              context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
        
        System.out.println("Recovery callback invoked for record: " + record.value());
        
        
        if (acknowledgment != null) {
            // Manually acknowledge the record
            acknowledgment.acknowledge();
        }


        return null; 
    };
    factory.setRecoveryCallback(recoveryCallback);

    // Other configuration options for the factory
    return factory;
}

Now using kafka spring version 3.0 how can I achieve the same thing ?

I refered to official documentation but it couldnt help.


Solution

  • See a DefaultErrorHandler with its BackOff and ConsumerRecordRecoverer options: https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#default-eh.

    So, instead of that factory.setRetryTemplate(retryTemplate); and factory.setRecoveryCallback(recoveryCallback); you have to use factory.setCommonErrorHandler(new DefaultErrorHandler(...));