Search code examples
javarabbitmqamqp

ConfirmListener.handleNack is not invoked when exchange is missing


In my application I need to determine whether a message is successfully published into AMQP exchange or some error happens. It seems like Publisher Confirms were invented to address this issue so I started experimenting with them.

For my Java application I used com.rabbitmq:amqp-client:jar:3.5.4 and I chose a very simple scenario when the exchange (where I try to publish) is missing. I expected that ConfirmListener.handleNack is going to be invoked in such case.

Here's my Java code:

package wheleph.rabbitmq_tutorial.confirmed_publishes;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConfirmedPublisher {
    private static final Logger logger = LoggerFactory.getLogger(ConfirmedPublisher.class);

    private final static String EXCHANGE_NAME = "confirmed.publishes";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                logger.debug(String.format("Received ack for %d (multiple %b)", deliveryTag, multiple));
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                logger.debug(String.format("Received nack for %d (multiple %b)", deliveryTag, multiple));
            }
        });

        for (int i = 0; i < 100; i++) {
            String message = "Hello world" + channel.getNextPublishSeqNo();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            logger.info(" [x] Sent '" + message + "'");
            Thread.sleep(2000);
        }

        channel.close();
        connection.close();
    }
}

However it's not the case. Log shows that no callback is executed:

17:49:34,988 [main] ConfirmedPublisher -  [x] Sent 'Hello world1'
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirmed.publishes' in vhost '/', class-id=60, method-id=40)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:657)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:640)
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:631)
    at wheleph.rabbitmq_tutorial.confirmed_publishes.ConfirmedPublisher.main(ConfirmedPublisher.java:38)

What's interesting is that pubilsher confirms work as expected when I try to use library for NodeJS amqp-coffee (0.1.24).

Here's my NodeJS code:

var AMQP = require('amqp-coffee');

var connection = new AMQP({host: 'localhost'});
connection.setMaxListeners(0);

console.log('Connection started')

connection.publish('node.confirm.publish', '', 'some message', {deliveryMode: 2, confirm: true}, function(err) {
     if (err && err.error && err.error.replyCode === 404) {
         console.log('Got 404 error')
     } else if (err) {
         console.log('Got some error')
     } else {
         console.log('Message successfully published')
     }
  })

Here's the output that indicates that the callback is invoked with proper argument:

Connection started
Got 404 error

Am I using com.rabbitmq:amqp-client incorrectly or there's some inconsistency in that library?


Solution

  • It turned out that my assumption was not correct and ConfirmListener.handleNack should not be invoked in this case.

    Here's a relevant portion of AMQP messages for the scenario described in the question observed for amqp-coffee library:

    ch#1 -> {#method<channel.open>(out-of-band=), null, ""}
    ch#1 <- {#method<channel.open-ok>(channel-id=), null, ""}
    ch#1 -> {#method<confirm.select>(nowait=false), null, ""}
    ch#1 <- {#method<confirm.select-ok>(), null, ""}
    ch#1 -> {#method<basic.publish>(ticket=0, exchange=node.confirm.publish, routing-key=, mandatory=false, immediate=false), #contentHeader<basic>(content-type=string/utf8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null), "some message"}
    ch#1 <- {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'node.confirm.publish' in vhost '/', class-id=60, method-id=40), null, ""}
    ch#2 -> {#method<channel.open>(out-of-band=), null, ""}
    ch#2 <- {#method<channel.open-ok>(channel-id=), null, ""}
    ch#2 -> {#method<confirm.select>(nowait=false), null, ""}
    ch#2 <- {#method<confirm.select-ok>(), null, ""}
    

    You can see that:

    1. After unsuccessful publish the channel is closed by broker using channel.close that includes the reason.
    2. basic.nack is not sent.
    3. The library automatically opens another channel for subsequent operations.

    This behaviour can be implemented in Java using ShutdownListener:

    package wheleph.rabbitmq_tutorial.confirmed_publishes;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ShutdownListener;
    import com.rabbitmq.client.ShutdownSignalException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConfirmedPublisher {
        private static final Logger logger = LoggerFactory.getLogger(ConfirmedPublisher.class);
        private final static String EXCHANGE_NAME = "confirmed.publishes";
    
        // Beware that proper synchronization of channel is needed because current approach may lead to race conditions
        private volatile static Channel channel;
    
        public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
    
            final Connection connection = connectionFactory.newConnection();
    
            for (int i = 0; i < 100; i++) {
                if (channel == null) {
                    createChannel(connection);
                }
                String message = "Hello world" + i;
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                logger.info(" [x] Sent '" + message + "'");
                Thread.sleep(2000);
            }
    
            channel.close();
            connection.close();
        }
    
        private static void createChannel(final Connection connection) throws IOException {
            channel = connection.createChannel();
            channel.confirmSelect(); // This in fact is not necessary
            channel.addShutdownListener(new ShutdownListener() {
                public void shutdownCompleted(ShutdownSignalException cause) {
                    // Beware that proper synchronization is needed here
                    logger.debug("Handling channel shutdown...", cause);
                    if (cause.isInitiatedByApplication()) {
                        logger.debug("Shutdown is initiated by application. Ignoring it.");
                    } else {
                        logger.error("Shutdown is NOT initiated by application. Resetting the channel.");
                        /* We cannot re-initialize channel here directly because ShutdownListener callbacks run in the connection's thread,
                           so the call to createChannel causes a deadlock since it blocks waiting for a response (whilst the connection's thread
                           is stuck executing the listener). */
                        channel = null;
                    }
                }
            });
        }
    }
    

    There're few caveats:

    1. Publisher confirms are not necessary in this case because we don't use ConfirmListener or any other functionality specific to that approach. However publisher confirms would be useful if we wanted to track which messages were successfully send and which not.
    2. If we launch ConfirmedPublisher and after some time create the missing exchange, all following messages will be successfully published. However all the previous failed messages are lost.
    3. Additional synchronization is needed.