Search code examples
javaspringrabbitmqspring-amqp

Republish message to same queue with updated headers after automatic nack in Spring AMQP


I am trying to configure my Spring AMQP ListenerContainer to allow for a certain type of retry flow that's backwards compatible with a custom rabbit client previously used in the project I'm working on.

The protocol works as follows:

  1. A message is received on a channel.
  2. If processing fails the message is nacked with the republish flag set to false
  3. A copy of the message with additional/updated headers (a retry counter) is published to the same queue

The headers are used for filtering incoming messages, but that's not important here.

I would like the behaviour to happen on an opt-in basis, so that more standardised Spring retry flows can be used in cases where compatibility with the old client isn't a concern, and the listeners should be able to work without requiring manual acking.

I have implemented a working solution, which I'll get back to below. Where I'm struggling is to publish the new message after signalling to the container that it should nack the current message, because I can't really find any good hooks after the nack or before the next message.

Reading the documentation it feels like I'm looking for something analogous to the behaviour of RepublishMessageRecoverer used as the final step of a retry interceptor. The main difference in my case is that I need to republish immediately on failure, not as a final recovery step. I tried to look at the implementation of RepublishMessageRecoverer, but the many of layers of indirection made it hard for me to understand where the republishing is triggered, and if a nack goes before that.

My working implementation looks as follows. Note that I'm using an AfterThrowsAdvice, but I think an error handler could also be used with nearly identical logic.

/*
MyConfig.class, configuring the container factory
*/
@Configuration
public class MyConfig {
    @Bean
    // NB: bean name is important, overwrites autoconfigured bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            Jackson2JsonMessageConverter messageConverter,
            RabbitTemplate rabbitTemplate
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);

        // AOP
        var a1 = new CustomHeaderInspectionAdvice();

        var a2 = new MyThrowsAdvice(rabbitTemplate);

        Advice[] adviceChain = {a1, a2};

        factory.setAdviceChain(adviceChain);

        return factory;
    }
}

/*
MyThrowsAdvice.class, hooking into the exception flow from the listener
*/
public class MyThrowsAdvice implements ThrowsAdvice {

    private static final Logger logger = LoggerFactory.getLogger(MyThrowsAdvice2.class);
    private final AmqpTemplate amqpTemplate;

    public MyThrowsAdvice2(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }


    public void afterThrowing(Method method, Object[] args, Object target, ListenerExecutionFailedException ex) {
        var message = message(args);
        var cause = ex.getCause();

        // opt-in to old protocol by throwing an instance of BusinessException in business logic
        if (cause instanceof BusinessException) {
            /* 
            NB: Since we want to trigger execution after the current method fails
            with an exception we need to schedule it in another thread and delay
            execution until the nack has happened. 
            */
            new Thread(() -> {
                try {
                    Thread.sleep(1000L);
                    var messageProperties = message.getMessageProperties();
                    var count = getCount(messageProperties);
                    messageProperties.setHeader("xb-count", count + 1);
                    var routingKey = messageProperties.getReceivedRoutingKey();
                    var exchange = messageProperties.getReceivedExchange();
                    amqpTemplate.send(exchange, routingKey, message);
                    logger.info("Sent!");
                } catch (InterruptedException e) {
                    logger.error("Sleep interrupted", e);
                }
            }).start();
            // NB: Produce the desired nack.
            throw new AmqpRejectAndDontRequeueException("Business logic exception, message will be re-queued with updated headers", cause);
        }
    }

    private static long getCount(MessageProperties messageProperties) {
        try {
            Long c = messageProperties.getHeader("xb-count");
            return c == null ? 0 : c;
        } catch (Exception e) {
            return 0;
        }
    }

    private static Message message(Object[] args) {
        try {
            return (Message) args[1];
        } catch (Exception e) {
            logger.info("Bad cast parse", e);
            throw new AmqpRejectAndDontRequeueException(e);
        }
    }

}

Now, as you can imagine, I'm not particularly pleased with the indeterminism of scheduling a new thread with a delay.

So my question is simply, is there any way I could produce a deterministic solution to my problem using the provided hooks of the ListenerContainer ?


Solution

  • Your current solution risks message loss; since you are publishing on a different thread after a delay. If the server crashes during that delay, the message is lost.

    It would be better to publish immediately to another queue with a TTL and dead-letter configuration to republish the expired message back to the original queue.

    Using the RepublishMessageRecoverer with retries set to maxattempts=1 should do what you need.