I am trying to write a resilient Kafka consumer. If there occurs an exception while processing the message in the listener method, I would like to retry it. I would like to retry a few times for certain exceptions, always for certain exceptions and never for the rest. I have read the Spring docs regarding SeekToCurrentErrorHandler, but not 100% sure how to implement it.
I have subclassed the ExceptionClassifierRetryPolicy and and am returning the appropriate Retry Policy based on the exceptions that occur in the Listener method.
I have created RetryTemplate and have and set its RetryPolicy with my custom implementation in the subclass.
I have set the retryTemplate on the Kafka container. I have set the error handler to be new SeekToCurrentHandler and also set the stateful retry property to be true.
Listener Method
@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
SomeAvroGeneratedClass obj = message.getPayLoad();
processIncomingMessage(obj);
ack.acknowledge();
}
Custom Retry Policy class
@Component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(8);
this.setExceptionClassifier( classifiable ->
{
// Always Retry when instanceOf TransientDataAccessException
if( classifiable.getCause() instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy();
}
else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
{
return new NeverRetryPolicy();
}
else
{
return simpleRetryPolicy;
}
} );
}}
Retry Template and Container Config
@Configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(2000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStateFulRetry(true);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
);
}
}
Listener properties:
Question:
With my current code, can I achieve my retry logic defined in MyRetryPolicy without causing a consumer rebalance when AlwaysRetryPolicy is returned? If no, please direct me in the right path.
Is my approach right in using the Error handler as well as the Retry?
Stateful retry in conjunction with a SeekToCurrentErrorHandler
is the right approach.
However, if you are using a recent version (2.2 or higher), the error handler will give up after 10 attempts; you can make that infinity with by setting maxFailures
to -1 (2.2) or the backOff
with Long.MAX_VALUE
(2.3 or higher).