I am testing the Spring RabbitMQ implementation for AMQP, and I would like to use the publisher confirms. What I am missing from both the documentation and the code is how I should handle unconfirmed messages of a certain age.
The bare RabbitMQ client java library provides a Channel.waitForConfirmsOrDie(timeout) method which works nicely, but then that would force me to get deeper under the Spring abstraction, also why wouldn't I want to keep publishing and retrying the unconfirmed messages? (and btw it would be brilliant if spring-retry could be used for this, currently I have to implement it).
I did find RabbitTemplate.getUnconfirmed(long) but the problem I have with it is it seems to be not threadsafe, as when my publisher is sending messages continuously and I try to resend the unconfirmed messages older than 5 seconds, it throws an error:
Exception in thread "publisher-A-500000to999999" java.util.ConcurrentModificationException
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1207)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1243)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1238)
at org.springframework.amqp.rabbit.core.RabbitTemplate.getUnconfirmed(RabbitTemplate.java:503)
at com.mycompany.rabbitmq.tools.failover.Publisher.resendUnconfirmed(Publisher.java:65)
at com.mycompany.rabbitmq.tools.failover.Publisher.run(Publisher.java:52)
at java.lang.Thread.run(Thread.java:745)
It might be that I'm doing something completely wrong, as I'm using the CorrelationData as a holder of the message, so it's easier to resend.
I created a MessageConfirmData class:
private static class MessageCorrelationData extends CorrelationData {
private final Message message;
private final long messageIndex;
private final int retryCount;
public MessageCorrelationData(Message message, long messageIndex, int retryCount) {
super(UUID.randomUUID().toString());
this.message = message;
this.messageIndex = messageIndex;
this.retryCount = retryCount;
}
}
And this is my resend logic which I call after every 100 message is sent out:
private int resendUnconfirmed() {
Collection<CorrelationData> unconfirmed = rabbitTemplate.getUnconfirmed(5000);
int numUnconfirmed = 0;
if (unconfirmed != null ) {
numUnconfirmed = unconfirmed.size();
for (CorrelationData correlationData : unconfirmed) {
MessageCorrelationData messageCorrelationData = (MessageCorrelationData) correlationData;
trySend(exchange, messageCorrelationData.message, messageCorrelationData.retryCount + 1, messageCorrelationData.messageIndex);
}
}
return numUnconfirmed;
}
My callback for the confirms:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
MessageCorrelationData mcd = (MessageCorrelationData) correlationData;
if (!ack) {
LOG.error("NACK, cause: " + cause + " resending, retry: " + mcd.retryCount);
trySend(exchange,mcd.message, mcd.retryCount + 1, mcd.messageIndex);
}
});
And finally the send:
rabbitTemplate.convertAndSend(exchange, "", amqpMessage, new MessageCorrelationData(amqpMessage, messageIndex, retryCount));
You have found a bug; I have raised a JIRA Issue if you want to follow it.
I am not sure how Spring Retry would help here; if you have some ideas, feel free to open a new feature or 'Improvement' JIRA issue.
EDIT
We should have it in a release shortly.