Search code examples
springrabbitmqspring-amqp

Perform an action in the first retry attempt to save the message on the Database


I'm trying to save every message that comes to a RabbitMQ queue on a database, just for logging. After that, the message needs to be processed normally.

The problem is: this queue has a retry policy configured with RetryOperationsInterceptor, and every time there is some error when processing the message, the message is requeued and processed again. The logic to save the message is in the Listener that reads the queue, so instead of just one message save on the database, I have 3 (the number of retries that I configured).

See my RetryOperationsInterceptor:

@Bean
public RetryOperationsInterceptor defaultRetryOperationsInterceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(3) 
            .backOffOptions(2000, 2.0, 10000)
            .build();
}

The container factory:

@Bean(name = FACTORY_CONTAINER_NAME)
public SimpleRabbitListenerContainerFactory factoryQueueExample(ConnectionFactory connectionFactory,
                                                                        RetryOperationsInterceptor defaultRetryOperationsInterceptor) {

    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(getMessageConverter());
    factory.setDefaultRequeueRejected(false);

    Advice[] adviceChain = {defaultRetryOperationsInterceptor};
    factory.setAdviceChain(adviceChain);

    return factory;
}

The queue Listener:

@Slf4j
@Component
@AllArgsConstructor
public class MessageListener {

    private final MessageRepository messageRepository;
    private final MessageService messageService;

    @RabbitListener(queues = MessageConfiguration.QUEUE,
        containerFactory = MessageConfiguration.FACTORY_CONTAINER_NAME)
    public void process(SomeMessage someMessage) {

        messageRepository.save(someMessage); // transform to a entity and save on database
        messageService.process(someMessage); // process message
    }
}

I don't know if is a relevant information, but this queue has also a DLQ associated. After this retries, the message goes to a DLQ queue.

My ideia is find something in the Retry interceptor that could call a service on the first attempt, to save the message just once.

I'm open for another ideas to resolve this problem too, like save the attempt number with the message, just to show that is not a repetead message being save on the database, but the same message in a different attempt.


Solution

  • Pay attention how you apply a retry:

    Advice[] adviceChain = {defaultRetryOperationsInterceptor};
    factory.setAdviceChain(adviceChain);
    

    The same way you can write your own MethodInterceptor to write into DB. And when you define this custom advice in order before that defaultRetryOperationsInterceptor, the DB save will be called only once, just because such an operation is going to happen outside of retry and even before.

    See more info about AOP in the Docs: https://docs.spring.io/spring/docs/5.0.6.RELEASE/spring-framework-reference/core.html#aop