I need to create an application using stateful retry, that listen to a Kafka topic and make calls to some APIs and then commit the message. If in one of these calls an error occurs, for example a timeout, the application must retry 4 attempts with an interval of 4 seconds. At the end of these four attempts, if it still hasn't worked, the application should send it to a DLQ topic.
The part of sending to the DLQ topic that I'm not able to do. Because when I tried to configure the DLQ, the retry don't stop and not send to DLQ too.
@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
@Headers final MessageHeaders headers,
Acknowledgment ack) {
AberturaContaLimiteCreditoCalculadoData dados;
if (!validarMensagem(mensagem)) {
dados = mensagem.value().getData();
throw new RuntimeException();
//This throw Runtime it's just to force it to retry.
private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
return mensagem == null || mensagem.value() == null;
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(4000L, 4L)));
return factory;
public DeadLetterPublishingRecoverer publisherRetryDLQ() {
return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
(record, ex) -> new TopicPartition(topicoDLQ, 0));
public ProducerFactory<String, String> producerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(config);
public KafkaOperations<String, String> createKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
Edit 2022-05-04:
We managed, from your tip with RetryListener and logging.level with Debug, to find the problem that was not building the Producer.
The problem now is that we receive a consumer with a different avro from the DLQ avro. The difference is that the DLQ has an extra field that must store the reason for the error.
2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}
Is there a way to do this conversion?
If I understand the question properly, you want to create a ProducerRecord
with a different value type.
Simply subclass the DLPR and override createProducerRecord()
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param key the key to use instead of the consumer record key.
* @param value the value to use instead of the consumer record value.
* @return the producer record to send.
* @see KafkaHeaders
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {
You can examine the headers to determine the exception that caused the failure. If you need the actual exception, override accept()
to capture it in a ThreadLocal
, then call super.accept()
; you can then use the thread local in createProducerRecord()
There are several solutions to publish the different type with the same producer factory.
see KafkaConsumer With Multiple Different Avro Producers And Transactions for an exampleKafkaTemplate
that uses a different serializer (it has a constructor where you can override the producer factory configs).