hi am trying to achieve dead letter exchange in spring integration XML so the scenario is AAA exchange binded BBB queue if BBB queue getting failed in some scenario like lister threw exception i want to navigate exception to Dead exchange queue to store message below is the code
created sample project
main.java
package com.spring.rabbit.first.deadletter;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("/applicationContext.xml");
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- Spring configuration -->
<context:component-scan base-package="com.spring.rabbit.first" />
<context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />
<!-- RabbitMQ common configuration -->
<rabbit:connection-factory id="connectionFactory"
username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />
<!-- <rabbit:connection-factory id="connectionFactory"/> -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- Queues -->
<rabbit:queue id="springQueue" name="spring.queue"
auto-delete="true" durable="false" />
<rabbit:listener-container
connection-factory="connectionFactory" advice-chain="retryAdvice">
<rabbit:listener queues="BBBqueue" ref="messageListener" />
</rabbit:listener-container>
<bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />
<bean id="retryAdvice"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
<property name="retryOperations" ref="retryTemplate" />
</bean>
<bean id="rejectAndDontRequeueRecoverer"
class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="2000" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="30000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</property>
</bean>
<rabbit:topic-exchange name="AAAqueue">
<rabbit:bindings>
<rabbit:binding queue="BBBqueue" pattern="" />
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="BBBqueue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- dead letter -->
<rabbit:topic-exchange name="XXX.dead.letter">
<rabbit:bindings>
<rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>
</beans>
Message handler
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MessageHandler implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("Received message: " + message);
System.out.println("Text: " + new String(message.getBody()));
message = null;
if (message == null) {
throw new NullPointerException();
}
}
}
Messagesender
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;
@Service
@ManagedResource
public class MessageSender {
@Autowired
private AmqpTemplate template;
@ManagedOperation
public void send(String text) {
send("amq.fanout", "NDPAR.SPRING.JAVA", text);
}
@ManagedOperation
public void send(String exchange, String key, String text) {
template.convertAndSend(exchange, key, text);
}
}
output:
23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer -
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
But still i did't see any message in dead letter queue.is am missing anything?? kindly help on this
I am not sure what your XXX-channel
and adapter are supposed to do, but you need to add a RejectAndDontRequeueRecoverer
to the retry advice factory bean (in the messageRecoverer
property).
The default recover just logs that retries are exhausted and discards the message.
EDIT
Here is a custom MessageRecoverer
that publishes a failed message from queue A
to a queue named A.dlq
- the queue and binding are declared automatically, as needed.
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";
private final Log logger = LogFactory.getLog(getClass());
private final RabbitTemplate errorTemplate;
private final RabbitAdmin admin;
private final String deadLetterExchangeName = "DLX";
private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);
private boolean initialized;
public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
this.errorTemplate = errorTemplate;
this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
}
@Override
public void recover(Message message, Throwable cause) {
if (!this.initialized) {
initialize();
}
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());
String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
if (this.admin.getQueueProperties(dlqName) == null) {
bindDlq(dlqName);
}
this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to " + dlqName);
}
}
private void initialize() {
this.admin.declareExchange(this.deadletterExchange);
this.initialized = true;
}
private void bindDlq(String dlqName) {
Queue dlq = new Queue(dlqName);
this.admin.declareQueue(dlq);
this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
}