Search code examples
spring-kafka

How to capture the exception and message key when using ErrorHandlingDeserializer2 to handle exceptions during deserialization


I'm using spring boot 2.1.7.RELEASE and spring-kafka 2.2.8.RELEASE.And I'm using @KafkaListener annotation to create a consumer and I'm using all default settings for the consumer.And I'm using below configuration as specified in the Spring-Kafka documentation.

// other props
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
    return new DefaultKafkaConsumerFactory<>(props);

Now, My requirements are when there is a deserialization exception. Please suggest how can i do this.

  1. Capture the exception and the record key to log the details
  2. Send this record to DLT

Solution

  • Add a SeekToCurrentErrorHandler, configured with a DeadLetterPublishingRecoverer to the container.

    See the documentation and here.

    Deserialization exceptions are treated as fatal and will not be retried.

    If you want to write your own error handler for some reason, the code in the DeadLetterPublishingRecoverer shows how the exception information is extracted.