Search code examples
spring-bootkafka-consumer-apispring-kafka

How to deal with multiple Kafka Listener and multiple Dead Letter Queue with Spring-Kafka?


I'm working on a Spring Boot 2.7.9 project. This project has 3 Kafka Listeners configured as follow :

@Configuration
@EnableKafka
public class ConsumerKafkaConfig {

    private final KafkaProperties kafkaProperties;

    public ConsumerKafkaConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ConsumerFactory<String, Object> firstEventFactory() {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> firstContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(firstEventFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<Object, Object> secondConsumerEventFactory() {
        final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> secondConsumerEventContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(secondConsumerEventFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Object> thirdConsumerEventFactory() {
        final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
        properties.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
        properties.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(
                properties
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> thirdConsumerEventContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(thirdConsumerEventFactory());
        return factory;
    }

}

For the containerFactories firstContainerFactory and secondContainerFactory, if messages cannot be processed, I want them to be published to specific dead letter queues:

  • firstContainerFactory sends errors to a topic named: my-first-dlq
  • secondContainerFactory sends errors to a topic named: my-second-dlq

With the latest versions of Spring, the setErrorHandler() method is now deprecated and replaced by setCommonErrorHandler(). However, I am having trouble finding how to publish messages to specific DLQs.

And on the other hand, for secondContainerFactory I just want to log error if something wrong appends.

I hope someone can help me with this.

Thanks a lot :)


Solution

  • See DefaultErrorHandler with its ConsumerRecordRecoverer as a ctor arg. That one can be a DeadLetterPublishingRecoverer. You inject there a KafkaTemplate and BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver. The default one is like this:

    private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
        DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
    

    So, you probably can model your own function to resolve into those my-first-dlq and my-second-dlq, respectively.

    To just log there is a CommonLoggingErrorHandler.

    See more in docs: https://docs.spring.io/spring-kafka/reference/html/#dead-letters