Search code examples
serializationerror-handlingdeserializationspring-kafka

How to handle SerializationException after deserialization


I am using Avro and Schema registry with my Spring Kafka setup.

I would like to somehow handle the SerializationException, which might be thrown during deserialization.

I found the following two resource:

https://github.com/spring-projects/spring-kafka/issues/164

How do I configure spring-kafka to ignore messages in the wrong format?

These resources suggest that I return null instead of throwing an SerializationException when deserializing and listen for KafkaNull. This solution works just fine.

I would however like to be able to throw an exception instead of returning null.

KIP-161 and KIP-210 provide better features to handling exceptions. I did find some resources mentioning KIP-161 in Spring Cloud, but nothing specific about Spring-Kafka.

Does anyone know how to catch SerializationException in Spring Boot?

I am using Spring Boot 2.0.2

Edit: I found a solution.

I would rather throw an exception and catch it than having to return null or KafkaNull. I am using my custom Avro serializer and deserializer in multiple different project, some of which are not Spring. If I changed my Avro serializer and deserializer then some of the other projects would need to be changed to expect the deserializer to return null.

I would like to shutdown the container, such that I do not lose any messages. The SerializationException should never be expected in production. The SerializationException should only be able to happen if Schema Registry is down or if an unformatted message somehow is sent to the production kafka. Either way, SerializationException should only happen very rarely, and if it happens then I want to shutdown the container such that no messages are lost and I can investigate the issue.

Just take into consideration that will catch all exceptions from your consumer container. In my specific case I just want to only shutdown if it is a SerializationException

public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer container) {

        //Only call super if the exception is SerializationException
        if (thrownException instanceof SerializationException) {
            //This will shutdown the container.
            super.handle(thrownException, records, consumer, container);
        } else {
            //Wrap and re-throw the exception
            throw new KafkaException("Kafka Consumer Container Error", thrownException);
        }
    }
}

This handler is passed to the consumer container. Below is an example of a KafkaListenerContainerFactory bean.

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {

    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.getContainerProperties().setPollTimeout(3000);

    factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());

    factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
    return factory;
}

Solution

  • There is nothing Spring can do; the deserialization occurs before the consumer gets any data. You need to enhance the deserializer.

    I would however like to be able to throw an exception instead of returning null.

    That won't help anything since Kafka won't know how to deal with the exception. Again; this all happens before the data is available so returning null (or some other special value) is the best technique.

    EDIT

    In 2.2, we added an error handling deserializer which delegates to the actual deserializer and returns null, with the exception in a header; the listener container then passes this directly to the error handler instead of the listener.