Search code examples
spring-bootspring-kafka

I want to enable Kafka errorhandling by annotation


I tried to enable spring-kafka errorhandling by annotation.

So I created a bean kafkaListenerErrorHandler.

@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler(ErrorHandler errorHandler) {
    KafkaListenerErrorHandler kafkaListenerErrorHandler = (message, exception) -> {
        MessageHeaders headers = message.getHeaders();
        ConsumerRecord<String, EventV1> consumerRecord = new ConsumerRecord<>(
                headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class),
                headers.get(KafkaHeaders.OFFSET, Long.class),
                headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY, String.class),
                (EventV1) message.getPayload());
        errorHandler.handle(exception, Collections.singletonList(consumerRecord), headers.get(KafkaHeaders.CONSUMER, Consumer.class), messageListenerContainer);
        return null;
    };

    return kafkaListenerErrorHandler;
}

and wired it with the KafkaListener by

@Autowired(required = false)
MessageListenerContainer messageListenerContainer;

@Autowired
private UserBO userBO;

@Bean
ErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
    return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 2));
}

@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", errorHandler = "kafkaListenerErrorHandler")
public void listen(ConsumerRecord<String, EventV1> message) {

    log.info("Received Messasge: {}", message);
    try {
        EventV1 value = message.value();
        userBO.getUserById(value.getCustomerId());
    } catch (Exception e) {
        throw new ListenerExecutionFailedException(e.getLocalizedMessage(), e);
    }
}

Even if it seems to work, the code seems ugly. The kafkaListenerErrorHandler constructs a new ConsumerRecord to forward it to the ErrorHandler and the "messageListenerContainer" is null, because I didn't find out how to get it into my context.

There should or must be a more elegant way to connect the ErrorHandler with the KafkaListenerErrorHandler

I also added a deserialization error handler to my setup.

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* Thanks for any advice.


Solution

  • You don't need the listener error handler; spring boot will automatically wire the errorHandler into the listener container and it will be called from there if the listener throws an exception.

    Also what you are doing is wrong since the error handler will only seek the current record and there are likely to be more records fetched in the last poll.

    KafkaListenerErrorHandlers are higher up in the stack and are used for more advanced error handling.

    For example, in a request/reply scenario, (the listener method returns a reply value), a listener error handler can return an error reply to the sender of the request message.

    EDIT

    Exceptions can be classified as "not retryable".

    See the documentation.

    Starting with version 2.3, the SeekToCurrentErrorHandler considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:

    • DeserializationException
    • MessageConversionException
    • MethodArgumentResolutionException
    • NoSuchMethodException
    • ClassCastException

    since these exceptions are unlikely to be resolved on a retried delivery.

    You can add more exception types to the not-retryable category, or completely replace the BinaryExceptionClassifier with your own configured classifier. See the Javadocs for SeekToCurrentErrorHandler for more information, as well as those for the spring-retry BinaryExceptionClassifier.

    Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

    @Bean
    public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
        SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
        handler.addNotRetryableException(IllegalArgumentException.class);
        return handler;
    }