Search code examples
javaxmlspringrabbitmqspring-rabbit

Rabbitmq dead letter spring integration xml


hi am trying to achieve dead letter exchange in spring integration XML so the scenario is AAA exchange binded BBB queue if BBB queue getting failed in some scenario like lister threw exception i want to navigate exception to Dead exchange queue to store message below is the code

created sample project

main.java

package com.spring.rabbit.first.deadletter;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("/applicationContext.xml");
    }
}

xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- Spring configuration -->

    <context:component-scan base-package="com.spring.rabbit.first" />
    <context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />

    <!-- RabbitMQ common configuration -->

    <rabbit:connection-factory id="connectionFactory"
        username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />


    <!-- <rabbit:connection-factory id="connectionFactory"/> -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- Queues -->

    <rabbit:queue id="springQueue" name="spring.queue"
        auto-delete="true" durable="false" />

    <rabbit:listener-container
        connection-factory="connectionFactory" advice-chain="retryAdvice">
        <rabbit:listener queues="BBBqueue" ref="messageListener" />
    </rabbit:listener-container>

    <bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />



    <bean id="retryAdvice"
        class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
        <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
        <property name="retryOperations" ref="retryTemplate" />
    </bean>

    <bean id="rejectAndDontRequeueRecoverer"
        class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />


    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="2000" />
                <property name="multiplier" value="10.0" />
                <property name="maxInterval" value="30000" />
            </bean>
        </property>
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="3" />
            </bean>
        </property>
    </bean>



    <rabbit:topic-exchange name="AAAqueue">
        <rabbit:bindings>
            <rabbit:binding queue="BBBqueue" pattern="" />
        </rabbit:bindings>
    </rabbit:topic-exchange>


    <rabbit:queue name="BBBqueue">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
        </rabbit:queue-arguments>
    </rabbit:queue>


    <!-- dead letter -->

    <rabbit:topic-exchange name="XXX.dead.letter">
        <rabbit:bindings>
            <rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>



</beans>

Message handler

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageHandler implements MessageListener {

    @Override
    public void onMessage(Message message) {

        System.out.println("Received message: " + message);
        System.out.println("Text: " + new String(message.getBody()));

        message = null;
        if (message == null) {
            throw new NullPointerException();
        }
    }
}

Messagesender

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;

@Service
@ManagedResource
public class MessageSender {

    @Autowired
    private AmqpTemplate template;

    @ManagedOperation
    public void send(String text) {
        send("amq.fanout", "NDPAR.SPRING.JAVA", text);
    }

    @ManagedOperation
    public void send(String exchange, String key, String text) {
        template.convertAndSend(exchange, key, text);
    }
}

output:

23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer - 
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
 receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

But still i did't see any message in dead letter queue.is am missing anything?? kindly help on this


Solution

  • I am not sure what your XXX-channel and adapter are supposed to do, but you need to add a RejectAndDontRequeueRecoverer to the retry advice factory bean (in the messageRecoverer property).

    The default recover just logs that retries are exhausted and discards the message.

    EDIT

    Here is a custom MessageRecoverer that publishes a failed message from queue A to a queue named A.dlq - the queue and binding are declared automatically, as needed.

    /*
     * Copyright 2014-2016 the original author or authors.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.example;
    
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.util.Map;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    
    public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {
    
        public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
    
        public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
    
        public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
    
        public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";
    
        private final Log logger = LogFactory.getLog(getClass());
    
        private final RabbitTemplate errorTemplate;
    
        private final RabbitAdmin admin;
    
        private final String deadLetterExchangeName = "DLX";
    
        private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);
    
        private boolean initialized;
    
        public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
            this.errorTemplate = errorTemplate;
            this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
        }
    
        @Override
        public void recover(Message message, Throwable cause) {
            if (!this.initialized) {
                initialize();
            }
            Map<String, Object> headers = message.getMessageProperties().getHeaders();
            headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
            headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
            headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
            headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());
    
            String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
            if (this.admin.getQueueProperties(dlqName) == null) {
                bindDlq(dlqName);
            }
            this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("Republishing failed message to " + dlqName);
            }
        }
    
        private void initialize() {
            this.admin.declareExchange(this.deadletterExchange);
            this.initialized = true;
        }
    
        private void bindDlq(String dlqName) {
            Queue dlq = new Queue(dlqName);
            this.admin.declareQueue(dlq);
            this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
        }
    
        private String getStackTraceAsString(Throwable cause) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter, true);
            cause.printStackTrace(printWriter);
            return stringWriter.getBuffer().toString();
        }
    
    }