Search code examples
spring-kafka

how can i send the records causing DeserializationException to a DLT while consuming a message from kafka topic using seekToErrorHandler?


I'm using spring boot 2.1.7.RELEASE and spring-kafka 2.2.8.RELEASE. We are in the process of upgrading the spring boot version but for now, we are using this spring-kafka version.

And I'm using @KafkaListener annotation to create a consumer and I'm using all default settings for the consumer.And I'm using below configuration as specified in the Spring-Kafka documentation.

    // other props
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, AvroDeserializer.class.getName());
        return new DefaultKafkaConsumerFactory<>(props);
        

Now, I've implemented my custom SeekToCurrentErrorHandler by extending SeekToCurrentErrorHandler to capture the send the records causing deserialization exception and send them to DLT.

Now the problem is, when i'm trying to test this logic with 30 messages with alternate messages having the deserialization exception, the list of the handle method is getting all 30 messages instead of getting only 15 messages which are causing the exception. With that said, how can i get the records with exception? Please suggest.

Here is my custom SeekToCurrentErrorHandler code

    @Component
    public class MySeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

        private final MyDeadLetterRecoverer deadLetterRecoverer;

        @Autowired
        public MySeekToCurrentErrorHandler(MyDeadLetterRecoverer deadLetterRecoverer) {
            super(-1);
            this.deadLetterRecoverer = deadLetterRecoverer;
        }

        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data, Consumer<?, ?> consumer, MessageListenerContainer container) {
            if (thrownException instanceof DeserializationException) {
                //Improve to support multiple records
                DeserializationException deserializationException = (DeserializationException) thrownException;
                deadLetterRecoverer.accept(data.get(0), deserializationException);
            
                ConsumerRecord<?, ?>. consumerRecord = data.get(0);
                sout(consumerRecord.key());
                sout(consumerRecord.value());
            } else {
                //Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
                super.handle(thrownException, data, consumer, container);
            }
        }
    }

Solution

  • We have to pass all the remaining records, so that the STCEH can re-seek all partitions for the records that weren't processed.

    After you recover the failed record, use SeekUtils to seek the remaining records (remove the one that you have recovered from the list).

    Set recoverable to false so that doSeeks() doesn't try to recover the new first record.

        /**
         * Seek records to earliest position, optionally skipping the first.
         * @param records the records.
         * @param consumer the consumer.
         * @param exception the exception
         * @param recoverable true if skipping the first record is allowed.
         * @param skipper function to determine whether or not to skip seeking the first.
         * @param logger a {@link Log} for seek errors.
         * @return true if the failed record was skipped.
         */
        public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
                boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, Log logger) {
    

    You won't need all this code when you move to a more recent version (Boot 2.1 and Spring for Apache Kafka 2.2 are no longer supported).