Search code examples
javaconfigurationrabbitmqspring-cloud-streamdead-letter

Configuration to handle Dead-letter queue


I have a project that uses Spring Cloud Streams - RabbitMQ to exchange messages within micro-services. One thing that is critical for my project is that I must not lose any message.

In order to minimize failures, I planned the following:

  • Use the default retry method for messages in queue
  • Configure dead-letter queue to put messages again on queue after some time
  • To avoid an infinite loop, allow only a few times (let's say, 5) a message could be republished from dead-letter queue to regular messaging queue.

The first two items I believe I could make it using the configuration below:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=300000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=

#input
spring.cloud.stream.bindings.myInput.destination=my-queue
spring.cloud.stream.bindings.myInput.group=my-group

However, I could not find searching on this reference guide how to do what I want (mostly, how to configure a maximum number of republish from dead-letter queue). I'm not completely sure I'm on the right path - maybe I should manually create a second queue and code what I want, and leave dead-letter only to messages that completely failed (which I must check regularly and handle manually, since my system should not lose any messages)...

I'm new to these frameworks, and I would like your help to configure mine...


Solution

  • This documentation for the rabbit binder shows how to publish a dead-letter to some parking-lot queue after some number of retries have failed.

    @SpringBootApplication
    public class ReRouteDlqApplication {
    
        private static final String ORIGINAL_QUEUE = "so8400in.so8400";
    
        private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
    
        private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
    
        private static final String X_RETRIES_HEADER = "x-retries";
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
            System.out.println("Hit enter to terminate");
            System.in.read();
            context.close();
        }
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RabbitListener(queues = DLQ)
        public void rePublish(Message failedMessage) {
            Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
            if (retriesHeader == null) {
                retriesHeader = Integer.valueOf(0);
            }
            if (retriesHeader < 3) {
                failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
                this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
            }
            else {
                this.rabbitTemplate.send(PARKING_LOT, failedMessage);
            }
        }
    
        @Bean
        public Queue parkingLot() {
            return new Queue(PARKING_LOT);
        }
    
    }
    

    The second example shows how to use the delayed exchange plugin to delay between retries.

    @SpringBootApplication
    public class ReRouteDlqApplication {
    
        private static final String ORIGINAL_QUEUE = "so8400in.so8400";
    
        private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
    
        private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
    
        private static final String X_RETRIES_HEADER = "x-retries";
    
        private static final String DELAY_EXCHANGE = "dlqReRouter";
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
            System.out.println("Hit enter to terminate");
            System.in.read();
            context.close();
        }
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RabbitListener(queues = DLQ)
        public void rePublish(Message failedMessage) {
            Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
            Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
            if (retriesHeader == null) {
                retriesHeader = Integer.valueOf(0);
            }
            if (retriesHeader < 3) {
                headers.put(X_RETRIES_HEADER, retriesHeader + 1);
                headers.put("x-delay", 5000 * retriesHeader);
                this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
            }
            else {
                this.rabbitTemplate.send(PARKING_LOT, failedMessage);
            }
        }
    
        @Bean
        public DirectExchange delayExchange() {
            DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
            exchange.setDelayed(true);
            return exchange;
        }
    
        @Bean
        public Binding bindOriginalToDelay() {
            return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
        }
    
        @Bean
        public Queue parkingLot() {
            return new Queue(PARKING_LOT);
        }
    
    }