Search code examples
springrabbitmqspring-rabbit

Handling Publisher Confirm timeout in Spring AMQP-RabbitMQ


I am testing the Spring RabbitMQ implementation for AMQP, and I would like to use the publisher confirms. What I am missing from both the documentation and the code is how I should handle unconfirmed messages of a certain age.

The bare RabbitMQ client java library provides a Channel.waitForConfirmsOrDie(timeout) method which works nicely, but then that would force me to get deeper under the Spring abstraction, also why wouldn't I want to keep publishing and retrying the unconfirmed messages? (and btw it would be brilliant if spring-retry could be used for this, currently I have to implement it).

I did find RabbitTemplate.getUnconfirmed(long) but the problem I have with it is it seems to be not threadsafe, as when my publisher is sending messages continuously and I try to resend the unconfirmed messages older than 5 seconds, it throws an error:

Exception in thread "publisher-A-500000to999999" java.util.ConcurrentModificationException
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1207)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1243)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1238)
at org.springframework.amqp.rabbit.core.RabbitTemplate.getUnconfirmed(RabbitTemplate.java:503)
at com.mycompany.rabbitmq.tools.failover.Publisher.resendUnconfirmed(Publisher.java:65)
at com.mycompany.rabbitmq.tools.failover.Publisher.run(Publisher.java:52)
at java.lang.Thread.run(Thread.java:745)

It might be that I'm doing something completely wrong, as I'm using the CorrelationData as a holder of the message, so it's easier to resend.

I created a MessageConfirmData class:

private static class MessageCorrelationData extends  CorrelationData {

    private final Message message;
    private final long messageIndex;
    private final int retryCount;

    public MessageCorrelationData(Message message, long messageIndex, int retryCount) {
        super(UUID.randomUUID().toString());
        this.message = message;
        this.messageIndex = messageIndex;
        this.retryCount = retryCount;
    }
}

And this is my resend logic which I call after every 100 message is sent out:

private int resendUnconfirmed() {
    Collection<CorrelationData> unconfirmed = rabbitTemplate.getUnconfirmed(5000);
    int numUnconfirmed = 0;
    if (unconfirmed != null ) {
        numUnconfirmed = unconfirmed.size();

        for (CorrelationData correlationData : unconfirmed) {
            MessageCorrelationData messageCorrelationData = (MessageCorrelationData) correlationData;
            trySend(exchange, messageCorrelationData.message, messageCorrelationData.retryCount + 1, messageCorrelationData.messageIndex);
        }
    }
    return numUnconfirmed;
}

My callback for the confirms:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        MessageCorrelationData mcd = (MessageCorrelationData) correlationData;
        if (!ack) {
            LOG.error("NACK, cause: " + cause + " resending, retry: " + mcd.retryCount);
            trySend(exchange,mcd.message, mcd.retryCount + 1, mcd.messageIndex);
        }
    });

And finally the send:

    rabbitTemplate.convertAndSend(exchange, "", amqpMessage, new MessageCorrelationData(amqpMessage, messageIndex, retryCount));

Solution

  • You have found a bug; I have raised a JIRA Issue if you want to follow it.

    I am not sure how Spring Retry would help here; if you have some ideas, feel free to open a new feature or 'Improvement' JIRA issue.

    EDIT

    Pull request issued.

    We should have it in a release shortly.