I'm working on a Spring Boot 2.7.9 project. This project has 3 Kafka Listeners configured as follow :
@Configuration
@EnableKafka
public class ConsumerKafkaConfig {
private final KafkaProperties kafkaProperties;
public ConsumerKafkaConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConsumerFactory<String, Object> firstEventFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> firstContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstEventFactory());
return factory;
}
@Bean
public ConsumerFactory<Object, Object> secondConsumerEventFactory() {
final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> secondConsumerEventContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerEventFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Object> thirdConsumerEventFactory() {
final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
properties.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(
properties
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> thirdConsumerEventContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(thirdConsumerEventFactory());
return factory;
}
}
For the containerFactories firstContainerFactory
and secondContainerFactory
, if messages cannot be processed, I want them to be published to specific dead letter queues:
firstContainerFactory
sends errors to a topic named: my-first-dlq
secondContainerFactory
sends errors to a topic named: my-second-dlq
With the latest versions of Spring, the setErrorHandler() method is now deprecated and replaced by setCommonErrorHandler()
. However, I am having trouble finding how to publish messages to specific DLQs.
And on the other hand, for secondContainerFactory
I just want to log error if something wrong appends.
I hope someone can help me with this.
Thanks a lot :)
See DefaultErrorHandler
with its ConsumerRecordRecoverer
as a ctor arg. That one can be a DeadLetterPublishingRecoverer
. You inject there a KafkaTemplate
and BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver
. The default one is like this:
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
So, you probably can model your own function to resolve into those my-first-dlq
and my-second-dlq
, respectively.
To just log there is a CommonLoggingErrorHandler
.
See more in docs: https://docs.spring.io/spring-kafka/reference/html/#dead-letters