Search code examples
javaspring-bootapache-kafkaspring-kafkaspring-retry

How to achieve Always retry policy/Custom Retry Policy for a Kafka Consumer without causing a rebalance in the group


I am trying to write a resilient Kafka consumer. If there occurs an exception while processing the message in the listener method, I would like to retry it. I would like to retry a few times for certain exceptions, always for certain exceptions and never for the rest. I have read the Spring docs regarding SeekToCurrentErrorHandler, but not 100% sure how to implement it.

I have subclassed the ExceptionClassifierRetryPolicy and and am returning the appropriate Retry Policy based on the exceptions that occur in the Listener method.

I have created RetryTemplate and have and set its RetryPolicy with my custom implementation in the subclass.

I have set the retryTemplate on the Kafka container. I have set the error handler to be new SeekToCurrentHandler and also set the stateful retry property to be true.

Listener Method

@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
   public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
      SomeAvroGeneratedClass obj = message.getPayLoad();
      processIncomingMessage(obj);
      ack.acknowledge();
   }

Custom Retry Policy class

 @Component
 public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
   {
      @PostConstruct
       public void init(){
             final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
             simpleRetryPolicy.setMaxAttempts(8);

     this.setExceptionClassifier( classifiable ->
           {
        // Always Retry when instanceOf   TransientDataAccessException
       if( classifiable.getCause() instanceof TransientDataAccessException)
            {
               return new AlwaysRetryPolicy();                                         

            }
      else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
           {
              return new NeverRetryPolicy();
           }
          else
            {
            return simpleRetryPolicy;
            }

      } );
 }}

Retry Template and Container Config

@Configuration
public class RetryConfig{


 @Bean
 public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){

   RetryTemplate retryTemplate = new RetryTemplate();
   retryTemplate.setRetryPolicy(new MyRetryPolicy());
   FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
   fixedBackOffPolicy.setBackOffPeriod(2000l);
   retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

   factory.setRetryTemplate(retryTemplate);
   factory.setAckOnError(false);
   factory.setErrorHandler(new SeekToCurrentErrorHandler());
   factory.setStateFulRetry(true);
   factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
   );

  }
}

Listener properties:

  1. AckMode = Manual
  2. auto.offset.commit = false

Question:

  1. With my current code, can I achieve my retry logic defined in MyRetryPolicy without causing a consumer rebalance when AlwaysRetryPolicy is returned? If no, please direct me in the right path.

  2. Is my approach right in using the Error handler as well as the Retry?


Solution

  • Stateful retry in conjunction with a SeekToCurrentErrorHandler is the right approach.

    However, if you are using a recent version (2.2 or higher), the error handler will give up after 10 attempts; you can make that infinity with by setting maxFailures to -1 (2.2) or the backOff with Long.MAX_VALUE (2.3 or higher).