Search code examples
springapache-kafkaspring-kafkadead-letter

Spring Kafka dead message Queue and retries


I have a configuration:

@Configuration
@EnableKafka
public class ConsumerConfig {

    final DlqErrorHandler dlqErrorHandler;

    public ConsumerConfig(DlqErrorHandler dlqErrorHandler) {
        this.dlqErrorHandler = dlqErrorHandler;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "group_id_two");
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(false);
        factory.getContainerProperties().setAckOnError(false);
        factory.setConcurrency(2);
        factory.setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

There is an implementation of the error handler:

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    private final KafkaTemplate kafkaTemplate;

    public DlqErrorHandler(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void handle(Exception e, List<ConsumerRecord<?, ?>> list, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer) {
        ConsumerRecord<?, ?> record = list.get(0);

        try {
            kafkaTemplate.send("dlqTopic", record.key(), record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
        } catch (Exception exception) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            throw new KafkaException("Seek to current after exception", exception);
        }
    }
}

There are two listeners:

@Component
public class KafkaConsumer {
    @KafkaListener(topics = "batchProcessingWithRetryPolicy", containerFactory = "concurrentKafkaListenerContainerFactory")
    public void consume(String message) {
        System.out.println(message + " NORMAL");
        if (message.equals("TEST ERROR")) {
            throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR");
        }
    }

    @KafkaListener(topics = "dlqTopic", containerFactory = "concurrentKafkaListenerContainerFactory")
    public void consumeTwo(String message) {
        System.out.println(message + " DQL");
        if (message.length() > 0) {
            throw new RuntimeException("EEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRROOOOOOOOOOOOOOOOOOORRRRRR ");
        }
    }
}

My question:

1)

factory.getContainerProperties().setAckOnError(false);

Method setAckOnError - deprecated. How can I replace this line of code so that the first listener after an error when processing a message does not make repeated attempts, but sends this message to DQL.

  1. How do I set a limit for DQL (DlqErrorHandler) on repetitions and time intervals between sending messages? That is, after the first error, the message appears in DQL, then I want to make 3 more attempts with an interval of 30 seconds and if it does not work, then go further.

Solution

  • ackOnError is replaced by ErrorHandler.isAckAfterHandle().

    Your error handler implementation is incomplete - you need to seek the other partitions in the remaining records list as well.

    Why don't you just use the SeekToCurrentErrorHandler and DeadLetterPublishingRecoverer provided by the framework. They support your use case.