I tried to enable spring-kafka errorhandling by annotation.
So I created a bean kafkaListenerErrorHandler
.
@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler(ErrorHandler errorHandler) {
KafkaListenerErrorHandler kafkaListenerErrorHandler = (message, exception) -> {
MessageHeaders headers = message.getHeaders();
ConsumerRecord<String, EventV1> consumerRecord = new ConsumerRecord<>(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class),
headers.get(KafkaHeaders.OFFSET, Long.class),
headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY, String.class),
(EventV1) message.getPayload());
errorHandler.handle(exception, Collections.singletonList(consumerRecord), headers.get(KafkaHeaders.CONSUMER, Consumer.class), messageListenerContainer);
return null;
};
return kafkaListenerErrorHandler;
}
and wired it with the KafkaListener
by
@Autowired(required = false)
MessageListenerContainer messageListenerContainer;
@Autowired
private UserBO userBO;
@Bean
ErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 2));
}
@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", errorHandler = "kafkaListenerErrorHandler")
public void listen(ConsumerRecord<String, EventV1> message) {
log.info("Received Messasge: {}", message);
try {
EventV1 value = message.value();
userBO.getUserById(value.getCustomerId());
} catch (Exception e) {
throw new ListenerExecutionFailedException(e.getLocalizedMessage(), e);
}
}
Even if it seems to work, the code seems ugly. The kafkaListenerErrorHandler
constructs a new ConsumerRecord to forward it to the ErrorHandler and the "messageListenerContainer" is null, because I didn't find out how to get it into my context.
There should or must be a more elegant way to connect the ErrorHandler with the KafkaListenerErrorHandler
I also added a deserialization error handler to my setup.
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
Thanks for any advice.
You don't need the listener error handler; spring boot will automatically wire the errorHandler
into the listener container and it will be called from there if the listener throws an exception.
Also what you are doing is wrong since the error handler will only seek the current record and there are likely to be more records fetched in the last poll.
KafkaListenerErrorHandler
s are higher up in the stack and are used for more advanced error handling.
For example, in a request/reply scenario, (the listener method returns a reply value), a listener error handler can return an error reply to the sender of the request message.
EDIT
Exceptions can be classified as "not retryable".
See the documentation.
Starting with version 2.3, the
SeekToCurrentErrorHandler
considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:
DeserializationException
MessageConversionException
MethodArgumentResolutionException
NoSuchMethodException
ClassCastException
since these exceptions are unlikely to be resolved on a retried delivery.
You can add more exception types to the not-retryable category, or completely replace the
BinaryExceptionClassifier
with your own configured classifier. See the Javadocs forSeekToCurrentErrorHandler
for more information, as well as those for thespring-retry
BinaryExceptionClassifier
.Here is an example that adds
IllegalArgumentException
to the not-retryable exceptions:
@Bean
public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
handler.addNotRetryableException(IllegalArgumentException.class);
return handler;
}