Search code examples
springspring-kafkaretry-logicspring-boot-3

DefaultErrorHandler is not configurable If @RetryableTopic used for retry and DLT handler


Spring boot version : 2.7.6 Spring kafka version : 2.8.11

Issue:

I was trying to handle the deserialization issues in code. To handle such issues in code, I created my own class by extending

DefaultErrorHandler

and overriding the public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {} Sample code as below

public class CustomDefaultErrorHandler extends DefaultErrorHandler {

    private static Logger log = LoggerFactory.getLogger(CustomDefaultErrorHandler.class);
    @Override
    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
        manageException(thrownException, consumer);
    }

    private void manageException(Exception ex, Consumer<?, ?> consumer) {
        log.error("Error polling message: " + ex.getMessage());
        if (ex instanceof RecordDeserializationException) {
            RecordDeserializationException rde = (RecordDeserializationException) ex;
            consumer.seek(rde.topicPartition(), rde.offset() + 1L);
            consumer.commitSync();
        } else {
            log.error("Exception not handled");
        }
    }
}

If I use the @RetryableTopic along with @KafkaListener

@RetryableTopic(listenerContainerFactory = "kafkaListenerContainerFactory", backoff = @Backoff(delay = 8000, multiplier = 2.0),
        dltStrategy = DltStrategy.FAIL_ON_ERROR
        , traversingCauses = "true", autoCreateTopics = "true", numPartitions = "3", replicationFactor = "3",
        fixedDelayTopicStrategy = FixedDelayStrategy.MULTIPLE_TOPICS, include = {RetriableException.class, RecoverableDataAccessException.class,
        SQLTransientException.class, CallNotPermittedException.class}
)
@KafkaListener(topics = "${topic.name}", groupId = "order", containerFactory = "kafkaListenerContainerFactory", id = "OTR")
public void consumeOTRMessages(ConsumerRecord<String, PayloadsVO> payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName) throws JsonProcessingException {
    logger.info("Payload :{}", payload.value());
    payloadsService.savePayload(payload.value(), pegasusTopicName);

}

What I saw in debugging the code, @RetryableTopic has its own DefaultErrorHandler configurations in

ListenerContainerFactoryConfigurer

and it stops my custom handler and deserialization process wont stop on issue.

Can you please suggest any way since I wanted to use annotations for retry process in my code

I tried to configured my own implementation of

DefaultErrorHandler

by extending it and configured in

ConcurrentKafkaListenerContainerFactory

Solution

  • It's quite involved, but you should be able to override the RetryTopicComponentFactory bean and override listenerContainerFactoryConfigurer() to return your custom error handler.

    That said, deserialization exceptions will go straight to the DLT anyway.

    BTW, calling commitSync() here is worthless because there were no records returned by the poll().