In my application I need to determine whether a message is successfully published into AMQP exchange or some error happens. It seems like Publisher Confirms were invented to address this issue so I started experimenting with them.
For my Java application I used com.rabbitmq:amqp-client:jar:3.5.4
and I chose a very simple scenario when the exchange (where I try to publish) is missing. I expected that ConfirmListener.handleNack
is going to be invoked in such case.
Here's my Java code:
package wheleph.rabbitmq_tutorial.confirmed_publishes;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConfirmedPublisher {
private static final Logger logger = LoggerFactory.getLogger(ConfirmedPublisher.class);
private final static String EXCHANGE_NAME = "confirmed.publishes";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
logger.debug(String.format("Received ack for %d (multiple %b)", deliveryTag, multiple));
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
logger.debug(String.format("Received nack for %d (multiple %b)", deliveryTag, multiple));
}
});
for (int i = 0; i < 100; i++) {
String message = "Hello world" + channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
logger.info(" [x] Sent '" + message + "'");
Thread.sleep(2000);
}
channel.close();
connection.close();
}
}
However it's not the case. Log shows that no callback is executed:
17:49:34,988 [main] ConfirmedPublisher - [x] Sent 'Hello world1'
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirmed.publishes' in vhost '/', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:657)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:640)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
at wheleph.rabbitmq_tutorial.confirmed_publishes.ConfirmedPublisher.main(ConfirmedPublisher.java:38)
What's interesting is that pubilsher confirms work as expected when I try to use library for NodeJS amqp-coffee (0.1.24).
Here's my NodeJS code:
var AMQP = require('amqp-coffee');
var connection = new AMQP({host: 'localhost'});
connection.setMaxListeners(0);
console.log('Connection started')
connection.publish('node.confirm.publish', '', 'some message', {deliveryMode: 2, confirm: true}, function(err) {
if (err && err.error && err.error.replyCode === 404) {
console.log('Got 404 error')
} else if (err) {
console.log('Got some error')
} else {
console.log('Message successfully published')
}
})
Here's the output that indicates that the callback is invoked with proper argument:
Connection started
Got 404 error
Am I using com.rabbitmq:amqp-client
incorrectly or there's some inconsistency in that library?
It turned out that my assumption was not correct and ConfirmListener.handleNack
should not be invoked in this case.
Here's a relevant portion of AMQP messages for the scenario described in the question observed for amqp-coffee library:
ch#1 -> {#method<channel.open>(out-of-band=), null, ""}
ch#1 <- {#method<channel.open-ok>(channel-id=), null, ""}
ch#1 -> {#method<confirm.select>(nowait=false), null, ""}
ch#1 <- {#method<confirm.select-ok>(), null, ""}
ch#1 -> {#method<basic.publish>(ticket=0, exchange=node.confirm.publish, routing-key=, mandatory=false, immediate=false), #contentHeader<basic>(content-type=string/utf8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null), "some message"}
ch#1 <- {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'node.confirm.publish' in vhost '/', class-id=60, method-id=40), null, ""}
ch#2 -> {#method<channel.open>(out-of-band=), null, ""}
ch#2 <- {#method<channel.open-ok>(channel-id=), null, ""}
ch#2 -> {#method<confirm.select>(nowait=false), null, ""}
ch#2 <- {#method<confirm.select-ok>(), null, ""}
You can see that:
channel.close
that includes the reason.basic.nack
is not sent.This behaviour can be implemented in Java using ShutdownListener
:
package wheleph.rabbitmq_tutorial.confirmed_publishes;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConfirmedPublisher {
private static final Logger logger = LoggerFactory.getLogger(ConfirmedPublisher.class);
private final static String EXCHANGE_NAME = "confirmed.publishes";
// Beware that proper synchronization of channel is needed because current approach may lead to race conditions
private volatile static Channel channel;
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
final Connection connection = connectionFactory.newConnection();
for (int i = 0; i < 100; i++) {
if (channel == null) {
createChannel(connection);
}
String message = "Hello world" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
logger.info(" [x] Sent '" + message + "'");
Thread.sleep(2000);
}
channel.close();
connection.close();
}
private static void createChannel(final Connection connection) throws IOException {
channel = connection.createChannel();
channel.confirmSelect(); // This in fact is not necessary
channel.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
// Beware that proper synchronization is needed here
logger.debug("Handling channel shutdown...", cause);
if (cause.isInitiatedByApplication()) {
logger.debug("Shutdown is initiated by application. Ignoring it.");
} else {
logger.error("Shutdown is NOT initiated by application. Resetting the channel.");
/* We cannot re-initialize channel here directly because ShutdownListener callbacks run in the connection's thread,
so the call to createChannel causes a deadlock since it blocks waiting for a response (whilst the connection's thread
is stuck executing the listener). */
channel = null;
}
}
});
}
}
There're few caveats:
ConfirmListener
or any other functionality specific to that approach. However publisher confirms would be useful if we wanted to track which messages were successfully send and which not.ConfirmedPublisher
and after some time create the missing exchange, all following messages will be successfully published. However all the previous failed messages are lost.