Search code examples
node.jsrabbitmqnode-amqplib

RabbitMQ: Ack/Nack a message on a channel that is closed and reopened


I'm getting this error from the RabbitMq server

Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 80"

This happends because the connection is lost during the consumer task and at the end, when the message is acked/nacked, i get this error because I cannot ack a message on a different channel than the one I got it from.

Here is the code for the RabbitMq connection

async connect({ prefetch = 1, queueName }) {
    this.queueName = queueName;
    console.log(`[AMQP][${this.queueName}] | connecting`);
    return queue
        .connect(this.config.rabbitmq.connstring)
        .then(conn => {
            conn.once('error', err => {
                this.channel = null;
                if (err.message !== 'Connection closing') {
                    console.error(
                        `[AMQP][${this.queueName}] (evt:error) | ${err.message}`,
                    );
                }
            });

            conn.once('close', () => {
                this.channel = null;
                console.error(
                    `[AMQP][${this.queueName}] (evt:close) | reconnecting`,
                );
                this.connect({ prefetch, queueName: this.queueName });
            });
            return conn.createChannel();
        })
        .then(ch => {
            console.log(`[AMQP-channel][${this.queueName}] created`);
            ch.on('error', err => {
                console.error(
                    `[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,
                );
            });
            ch.on('close', () => {
                console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
            });
            this.channel = ch;
            return this.channel;
        })
        .then(ch => {
            return this.channel.prefetch(prefetch);
        })
        .then(ch => {
            return this.channel.assertQueue(this.queueName);
        })
        .then(async ch => {
            while (this.buffer.length > 0) {
                const request = this.buffer.pop();
                await request();
            }
            return this.channel;
        })
        .catch(error => {
            console.error(error);
            console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
            return this._delay(1000).then(() =>
                this.connect({ prefetch, queueName: this.queueName }),
            );
        });
}

async ack(msg) {
    try {
        if (this.channel) {
            console.log(`[AMQP][${this.queueName}] ack`);
            await this.channel.ack(msg);
        } else {
            console.log(`[AMQP][${this.queueName}] ack (buffer)`);
            this.buffer.push(() => {
                this.ack(msg);
            });
        }
    } catch (e) {
        console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
    }
}

As you can see, after the connection is enstablished a channel is created, and after i get a connection issue, the channel is set to NULL and after 1 second the connection retries, recreating a new channel.

For managing the offline period I'm using a buffer that collects all the ack message that are sent while the channel was NULL and after the connection is reenstabilshed i unload the buffer.

So basically I have to find a way to send an ACK after a connection is lost or a channel is closed for watherver reason.

Thanks for any help


Solution

  • You cannot acknowledge a message once the channel is closed (whatever is the reason). The broker will automatically re-deliver the same message to another consumer.

    This is well documented in RabbitMQ message confirmation section.

    When Consumers Fail or Lose Connection: Automatic Requeueing

    When manual acknowledgements are used, any delivery (message) that was not acked is automatically requeued when the channel (or connection) on which the delivery happened is closed. This includes TCP connection loss by clients, consumer application (process) failures, and channel-level protocol exceptions (covered below).

    ...

    Due to this behavior, consumers must be prepared to handle redeliveries and otherwise be implemented with idempotence in mind. Redeliveries will have a special boolean property, redeliver, set to true by RabbitMQ. For first time deliveries it will be set to false. Note that a consumer can receive a message that was previously delivered to another consumer.

    As the documentation suggests, you need to handle such issues at the consumer side by implementing a message idempotency design pattern. In other words, your architecture should be ready to deal with message re-delivery due to errors.

    Alternatively, you can disable message acknowledgment and obtain a "once delivery" type of pattern. This implies that in case of errors you will have to deal with message loss.

    Further readings in the matter: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

    And the follow up once Kafka introduced new semantics: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/