Search code examples
springspring-bootapache-kafkaspring-kafka

Implement Custom Error Handler for Batch Listener in Spring Kafka


I will be adding a Custom Error Handler to my container factory

Here is my requirement: I have a batch listener and i want to retry the entire batch even if one particular record fails due to an exception. This should happen infinitely until manual intervention. So this custom error handler should be controlled by a flag, if the flag is true continue to retry until the records are processed, if the flag is false skip (recover) and move to the next

i know what to do to retry but not sure what to do to skip (recover) and move on to the next. here is my code

Version: spring boot 2.6.x , spring-kafka 2.9

public class CustomBatchErrorHandler extends DefaultErrorHandler {

private final boolean shouldRetry ;
public CustomBatchErrorHandler () {
    super();
    shouldRetry = getFlag();
}

@Override
public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
    if(shouldRetry){
        LOG.warn("Retrying batch due to error");
        super.handleBatch(thrownException, data, consumer, container, invokeListener);
    }
    else {
        // Need help here to skip the batch and move to the next batch
        LOG.warn("Error handler skipped due to error");
    }
}

}


Solution

  • To skip the whole batch means to commit its offset and let the consumer to move on to the next cursor. And technically you don't need to do anything in that else. Just don't throw an exception and framework will take care about committing those offsets:

                    invokeBatchErrorHandler(records, recordList, e);
                    commitOffsetsIfNeededAfterHandlingError(records);
    

    According to the DefaultErrorHandler, the ackAfterHandle is true by default. However that one is in the current 3.1.x version. Not sure if behavior is slightly different in that old Spring for Apache Kafka version.