I read a lot of documentation/stackoverflow and still I have problem when exception occurs to move message to dead letter queue. I'm using spring-boot Here is my configuration:
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
RetryOperationsInterceptor interceptor() {
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "error_exchange ", "error_key");
return RetryInterceptorBuilder
.stateless()
.recoverer(recoverer)
.build();
}
Dead letter queue:
Features
x-dead-letter-routing-key: error_key
x-dead-letter-exchange: error_exchange
durable: true
Policy DLX
Name of the queue: error
My exchange: name:error_exchange binding: to: error, routing_key: error_key
Here is my conusmer:
@RabbitListener(queues = "${rss_reader_chat_queue}")
public void consumeMessage(Message message) {
try {
List<ChatMessage> chatMessages = messageTransformer.transformMessage(message);
List<ChatMessage> save = chatMessageRepository.save(chatMessages);
sendMessagesToChat(save);
}
catch(Exception ex) {
throw new AmqpRejectAndDontRequeueException(ex);
}
}
So when I send an invalid message and some exception occurs, it happens once (and it's good because previously message was sent over and over again) but the message doesn't go to my dead letter queue. Can you help me with this?
You need to show the rest of your configuration - boot properties, queue @Bean
s etc. You also seem to have some confusion between using a republishing recoverer Vs dead letter queues; they are different ways to achieve similar results. You typically wouldn't use both.
Here's a simple boot app that demonstrates using a DLX/DLQ...
@SpringBootApplication
public class So43694619Application implements CommandLineRunner {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So43694619Application.class, args);
context.close();
}
@Autowired
RabbitTemplate template;
@Autowired
AmqpAdmin admin;
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run(String... arg0) throws Exception {
this.template.convertAndSend("so43694619main", "foo");
this.latch.await(10, TimeUnit.SECONDS);
this.admin.deleteExchange("so43694619dlx");
this.admin.deleteQueue("so43694619main");
this.admin.deleteQueue("so43694619dlx");
}
@Bean
public Queue main() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "so43694619dlx");
args.put("x-dead-letter-routing-key", "so43694619dlxRK");
return new Queue("so43694619main", true, false, false, args);
}
@Bean
public Queue dlq() {
return new Queue("so43694619dlq");
}
@Bean
public DirectExchange dlx() {
return new DirectExchange("so43694619dlx");
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlq()).to(dlx()).with("so43694619dlxRK");
}
@RabbitListener(queues = "so43694619main")
public void listenMain(String in) {
throw new AmqpRejectAndDontRequeueException("failed");
}
@RabbitListener(queues = "so43694619dlq")
public void listenDlq(String in) {
System.out.println("ReceivedFromDLQ: " + in);
this.latch.countDown();
}
}
Result:
ReceivedFromDLQ: foo