Search code examples
spring-bootrabbitmqspring-amqpdead-letterretry-logic

RabbitMQ message still retries after throwing AmqpRejectAndDontRequeueException


I have a simple Spring Boot application where I have the following settings for RabbitMQ (spring-boot-starter-amqp version is 2.7.0):

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: my_host
    username: admin
    password: password
    template:
      retry:
        enabled: true
        initial-interval: 3s
        max-interval: 10s
        multiplier: 2
        max-attempts: 3
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 3s
          max-interval: 10s
          multiplier: 2
          max-attempts: 3

A SimpleRabbitListenerContainerFactory is configured with a MessagePostProcessor:

@Bean
public SimpleRabbitListenerContainerFactory myInterceptContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                        final ConnectionFactory connectionFactory,
                                                                        final MyMessagePostProcessor myMessagePostProcessor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory() {
        @Override
        protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
            instance.setAfterReceivePostProcessors(myMessagePostProcessor);
            super.initializeContainer(instance, endpoint);
        }
    };
    ((CachingConnectionFactory) connectionFactory).setRequestedHeartBeat(60);
    configurer.configure(factory, connectionFactory);
    return factory;
}

MessagePostProcessor is very simple, if the version is wrong in the header, it throws an AmqpRejectAndDontRequeueException:

@Component
public class MyMessagePostProcessor implements MessagePostProcessor {

    private static final Logger LOGGER = LogManager.getLogger(MyMessagePostProcessor.class);

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        if (!"1.0".equalsIgnoreCase(message.getMessageProperties().getHeader("version"))) {
            LOGGER.error("wrong version");
            throw new AmqpRejectAndDontRequeueException("wrong version");
        }
        return message;
    }

}

When I send a message with a wrong version, it works properly based on the logs:

MyApp: 2022-07-05 15:49:33,675 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.p.MyMessagePostProcessor:22 - wrong version
MyApp: 2022-07-05 15:49:33,676  WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.AmqpRejectAndDontRequeueException: wrong version
    at com.my.app.queue.postprocess.MyMessagePostProcessor.postProcessMessage(MyMessagePostProcessor.java:23) ~[classes/:1.0.0]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1544) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]

My consumer is only doing one error log and immediately throws AmqpRejectAndDontRequeueException to see if the message goes into the deadletter queue without retry:

@Component
public class MyMessageConsumer {
    private static final Logger LOGGER = LogManager.getLogger(MyMessageConsumer.class);

    @RabbitListener(queues = "my.queue", containerFactory = "myInterceptContainerFactory")
    public void consumerMessage() {
        LOGGER.error("error in consuming message");
        throw new AmqpRejectAndDontRequeueException("over");
    }

}

But based on the logs it is clearly visible that it retries 3 times (because my error log is printed 3 times) and only after then the message is delivered into deadletter queue:

MyApp: 2022-07-05 15:50:32,182 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:46,603 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,617 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.g.r.a.q.c.MyMessageConsumer:23 - error in consuming message
MyApp: 2022-07-05 15:50:52,618  WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@4060893b(byte[75])' MessageProperties [headers={version=1.0, requestor=MyApp}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=my.queue, deliveryTag=2, consumerTag=amq.ctag-5Xx6EwwjVokRV6wD9xL8Og, consumerQueue=my.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
    at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
    ... 27 more
MyApp: 2022-07-05 15:50:52,620  WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] o.s.a.r.l.ConditionalRejectingErrorHandler:169 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
    at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:77) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.amqp.rabbit.listener.$Proxy187.invokeListener(Unknown Source) ~[?:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException
    ... 19 more
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid()' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:271) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
    ... 14 more
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: over
    at com.my.app.queue.consumer.MyMessageConsumer.retrieveNcid(MyMessageConsumer.java:24) ~[classes/:1.0.0]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:262) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:208) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:147) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97) ~[spring-retry-1.3.3.jar:?]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:?]
    ... 14 more

But I want to deliver the message into the deadletter queue without retrying it 3 times.

I guess there is something with ListenerExecutionFailedException: Retry Policy Exhausted, but I don't understand why my AmqpRejectAndDontRequeueException is wrapped in this exception, because my MessagePostProcessor doesn't throw ListenerExecutionFailedException, only the one which I throw in code.

What am I missing here?


Solution

  • The container knows nothing about the exception until retries are exhausted.

    You would need to customize the retry policy to not retry for that exception.

    This is currently not possible using application properties; you would need to configure the retry interceptor as a bean and inject it into the container factory.

    If you need help with that, I can add an example.