Search code examples
javaspringspring-rabbitspring-retry

Spring annotation @Retryable - how to set an interceptor


I am using a @Retryable annotation on a method in a @Serviceclass

@Service
@EnableRetry 
public class PushService {

    @Retryable(maxAttempts=5)
    public Result pushIt(myMessage messageIn) {
        ...
    }
}

and it works like a charme: I am getting a message directly from RabbitMQ, it is not acknowledged until either there is no error, or the number of attempts reach 5, and at that time the messages goes straight to the DLQ, right as I wanted.

My only problem is that I need to set the maxAttempts dynamically, from a property file. The solution should be setting an interceptor, but the only fact of having one causes an error, for example when I have :

@Service
@EnableRetry 
public class PushService {

    @Retryable(interceptor="myInterceptor") 
    public Result pushIt(myMessage messageIn) {
        ...
    }
}

where myInterceptor is defined as :

@Bean
public StatefulRetryOperationsInterceptor myInterceptor() {
    return RetryInterceptorBuilder.stateful().maxAttempts(5).build();
}

I get an infinite loop with the following exception:

2015-04-08 07:12:10,970 GMT [SimpleAsyncTaskExecutor-1] (ConditionalRejectingErrorHandler.java:67) WARN  listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean$3.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:103)
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:132)
    at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:118)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
    at com.acme.push.service.PushService$$EnhancerBySpringCGLIB$$9d503bc1.pushMessage(<generated>)
    at com.acme.push.receiver.PushListener.onMessage(PushListener.java:42)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
    ... 10 more

I am pretty sure I am keeping it too simple, but I just have no clues on what could cause this error and how to solve it, anybody has an idea of what is going on ?


Solution

  • I finally managed to obtain the needed flexibility without using the @Retrayable annotation.

    I created a RetryAdvice with my parameters for delay and maximum number of attempts:

    @Bean
    public MethodInterceptor retryAdvice() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(delay);
        return RetryInterceptorBuilder.stateless().backOffPolicy(backOffPolicy)
                .maxAttempts(maxAttempts).build();
    }
    

    and I inserted the Advice in the adviceChain of the ListenerContainer

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(pushConnectionFactory());
        container.setQueues(pushQueue());
        container.setMessageListener(pushListener());
    
        Advice[] adviceChain = new Advice[] { retryAdvice() };
        container.setAdviceChain(adviceChain);
    
        return container;
    }
    

    In this way, whenever my Listener will be throwing

    throw new AmqpRejectAndDontRequeueException(cause);
    

    this will cause the container to retry the indicated number of times with the desired delay, after which the exception will be propagated and the message will be delivered in the DLQ