Search code examples
rabbitmqspring-amqpspring-rabbit

Spring rabbitMQ DLQ on missing message_id


Currently I've set up my message listener container to use spring-retry for handling retries but when someone sends a message without giving a message-id, the message listener stops. Can this behaviour be changes so that it puts the message on the dead letter queue instead of stopping the listener ?

My configuration for retry is the following:

 @Bean
 public StatefulRetryOperationsInterceptor retryInterceptor() {
   StatefulRetryOperationsInterceptorFactoryBean f = new     
   StatefulRetryOperationsInterceptorFactoryBean();
   f.setRetryOperations(retryTemplate());
   f.setMessageRecoverer(new RejectAndDontRequeueRecoverer());

   return f.getObject();
}

private RetryOperations retryTemplate() {
  RetryTemplate retryTemplate = new RetryTemplate();
  ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();

  backOffPolicy.setInitialInterval(50);
  backOffPolicy.setMultiplier(1.5);
  backOffPolicy.setMaxInterval(1000);
  retryTemplate.setBackOffPolicy(backOffPolicy);

  SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
  retryPolicy.setMaxAttempts(10);
  retryTemplate.setRetryPolicy(retryPolicy);

  return retryTemplate;
}

and I get the following exception:

2014-08-01 08:50:27,858 [taskExecutor<OmittedForPrivacy>-2] WARN  mqp.rabbit.listener.SimpleMessageListenerContainer - Execution of Rabbit message listener failed, and no ErrorHandler has been set.
org.springframework.amqp.rabbit.listener.FatalListenerExecutionException: Illegal null id in message. Failed to manage retry for message: (Body:'{
    <Omitted for privacy>
}'; ID:null; Content:application/json; Headers:{__TypeId__=<OmittedForPrivacy>}; Exchange:; RoutingKey:<OmittedForPrivacy>; Reply:null; DeliveryMode:NON_PERSISTENT; DeliveryTag:1)
    at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean$3.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:114) ~[spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:132) ~[spring-retry-1.1.0.RELEASE.jar:na]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.0.0.RELEASE.jar:4.0.0.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) ~[spring-aop-4.0.0.RELEASE.jar:4.0.0.RELEASE]
    at com.sun.proxy.$Proxy612.invokeListener(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:620) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:454) ~[spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:480) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:464) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:61) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:558) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_17]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_17]
    at java.lang.Thread.run(Thread.java:722) [na:1.7.0_17]

So I would like to change the fact that he stops the message listener container afterwards and instead just puts the message on the deadletter queue.

Krgds


Solution

  • What do you mean the listener "stops"; please show your config and post a DEBUG log someplace showing the behavior you describe.

    EDIT:

    Ah - the AmqpRejectAndDontRequeueRecover is to recover after listener exceptions; this exception occurs before we even get into the retry logic.

    You can add a MissingMessageIdAdvice to the advice chain (before the retry advice).

    It will let a message with a missing id be tried, but will throw a RejectAndDontRequeueException if it is redelivered (and fails again) - only one retry, regardless of the retry setting.

    If you want to reject such messages without trying at all, you would have to create your own advice.