Search code examples
javarabbitmqspring-amqpspring-rabbit

How to move error message to rabbitmq dead letter queue


I read a lot of documentation/stackoverflow and still I have problem when exception occurs to move message to dead letter queue. I'm using spring-boot Here is my configuration:

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    RetryOperationsInterceptor interceptor() {
        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "error_exchange ", "error_key");
        return RetryInterceptorBuilder
            .stateless()
            .recoverer(recoverer)
            .build();
    }

Dead letter queue:

Features    
x-dead-letter-routing-key:  error_key
x-dead-letter-exchange: error_exchange
durable:    true
Policy  DLX

Name of the queue: error

My exchange: name:error_exchange binding: to: error, routing_key: error_key

Here is my conusmer:

@RabbitListener(queues = "${rss_reader_chat_queue}")
    public void consumeMessage(Message message) {
        try {
            List<ChatMessage> chatMessages = messageTransformer.transformMessage(message);
            List<ChatMessage> save = chatMessageRepository.save(chatMessages);
            sendMessagesToChat(save);
        }
        catch(Exception ex) {
            throw new AmqpRejectAndDontRequeueException(ex);
        }
    }

So when I send an invalid message and some exception occurs, it happens once (and it's good because previously message was sent over and over again) but the message doesn't go to my dead letter queue. Can you help me with this?


Solution

  • You need to show the rest of your configuration - boot properties, queue @Beans etc. You also seem to have some confusion between using a republishing recoverer Vs dead letter queues; they are different ways to achieve similar results. You typically wouldn't use both.

    Here's a simple boot app that demonstrates using a DLX/DLQ...

    @SpringBootApplication
    public class So43694619Application implements CommandLineRunner {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(So43694619Application.class, args);
            context.close();
        }
    
        @Autowired
        RabbitTemplate template;
    
        @Autowired
        AmqpAdmin admin;
    
        private final CountDownLatch latch = new CountDownLatch(1);
    
        @Override
        public void run(String... arg0) throws Exception {
            this.template.convertAndSend("so43694619main", "foo");
            this.latch.await(10, TimeUnit.SECONDS);
            this.admin.deleteExchange("so43694619dlx");
            this.admin.deleteQueue("so43694619main");
            this.admin.deleteQueue("so43694619dlx");
        }
    
    
        @Bean
        public Queue main() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange", "so43694619dlx");
            args.put("x-dead-letter-routing-key", "so43694619dlxRK");
            return new Queue("so43694619main", true, false, false, args);
        }
    
        @Bean
        public Queue dlq() {
            return new Queue("so43694619dlq");
        }
    
        @Bean
        public DirectExchange dlx() {
            return new DirectExchange("so43694619dlx");
        }
    
        @Bean
        public Binding dlqBinding() {
            return BindingBuilder.bind(dlq()).to(dlx()).with("so43694619dlxRK");
        }
    
        @RabbitListener(queues = "so43694619main")
        public void listenMain(String in) {
            throw new AmqpRejectAndDontRequeueException("failed");
        }
    
        @RabbitListener(queues = "so43694619dlq")
        public void listenDlq(String in) {
            System.out.println("ReceivedFromDLQ: " + in);
            this.latch.countDown();
        }
    
    }
    

    Result:

    ReceivedFromDLQ: foo