Search code examples
springspring-bootspring-kafka

Spring Kafka - CommonErrorHandler ignored on DeserializationException


I'd need some clarification from Spring Kafka connoisseurs onto the reasoning behind the following error handling & behaviour.

My intention was to have an ExponentionalBackOff exposed as the default CommonErrorHandler and catch essentially all possible exceptions. If an error occurs, some monitoring/alerting kicks in and I would be fixing whatever goes wrong. No DLQ configured for now, we would be blocking the queue.

@Bean
CommonErrorHandler commonErrorHandler() {
    var exponentialBackOff = new ExponentialBackOff();
    exponentialBackOff.setMaxInterval(Duration.ofHours(1).toMillis());
    return new DefaultErrorHandler(exponentialBackOff);
}

The default consumption property based configuration:

spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
spring.kafka.consumer.properties.spring.kafka.key.serialization.bytopic.default=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.kafka.key.serialization.bytopic.config=\
    topic.order.archive-invoice-event:org.apache.kafka.common.serialization.IntegerDeserializer

Listener looks like as follow:

@KafkaListener(topics = "topic.order.archive-invoice-event")
void onArchiveInvoice(ArchiveInvoiceEvent event) {
    invoiceService.archiveInvoice(event.getOrderId());
}

If an exception occurs at runtime during the consumption of an event, everything is fine, meaning the commonErrorHandler takes over as expected.

However, I ran into an issue whereby the producer decided to change the type of the serialized key from Integer to String.

At runtime I got the following log:

Backoff FixedBackOff{interval=0, currentAttempts=1, maxAttempts=0} exhausted for topic.order.archive-invoice-event-1@489253

And stacktrace

o.a.k.c.e.SerializationException: Size of data received by IntegerDeserializer is not 4
    at o.a.k.c.s.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
    at o.a.k.c.s.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
    at o.a.k.c.s.Deserializer.deserialize(Deserializer.java:62)
    at o.s.k.s.s.DelegatingByTopicDeserializer.deserialize(DelegatingByTopicDeserializer.java:78)
    at o.s.k.s.s.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:215)
    ... 16 common frames omitted
Wrapped by: o.s.k.s.s.DeserializationException: failed to deserialize
    at o.s.k.s.s.SerializationUtils.deserializationException(SerializationUtils.java:158)
    at o.s.k.s.s.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:218)
    at o.a.k.c.s.Deserializer.deserialize(Deserializer.java:73)
    at o.a.k.c.c.i.CompletedFetch.parseRecord(CompletedFetch.java:319)

I retraced the exception and the only place where a FixedBackOff policy is used is in FailedRecordProcessor (used by AbstractMessageListenerContainer).

This FailedRecordProcessor applies some bits of magic and overules the commonErrorHandling in case of classified exceptions (such as DeserializationException).

Two questions here:

  1. Why? Is that really intended or have I misconfigured anything?
  2. Any way for capturing Deserialization Exceptions (key / value)? Based on the this baeldung article I was led to believe it was the right approach and the Spring documentation is not clear enough to me

Thank you for your enlightenment

Dependencies:

  • spring-boot: 3.3.5
  • spring-kafka: 3.2.4

Solution

  • Why? Is that really intended or have I misconfigured anything?

    As mentioned by @artem the documentation does mention some fatal exceptions which bypass the custom error handling logic.

    DeserializationException
    MessageConversionException
    ConversionException
    MethodArgumentResolutionException
    NoSuchMethodException
    ClassCastException
    

    Therefore, for anyone willing to apply the same error strategy independently of the origin of the error, the default provided by Spring can be overridden as follow:

    @Bean
    CommonErrorHandler commonErrorHandler() {
    
        var exponentialBackOff = new ExponentialBackOff();
        exponentialBackOff.setMaxInterval(Duration.ofHours(1).toMillis());
    
        var handler = new DefaultErrorHandler(exponentialBackOff);
    
        // unified error handling
        handler.setClassifications(Map.of(), true);
    
        return handler;
    }