Search code examples
spring-bootrabbitmqspring-rabbit

RabbitMQ channel.addConfirmListener() , interface ackCallback Some callbacks are missing?


This is my code, channel.addConfirmListener() ackCallback Some callbacks will be lost, The message is indeed sent to the rabbitmq server and can be consumed normally , But I sleep for 2ms after sending the message, and all ack callbacks can be received,

I don't know if this is an error in my code or a rabbitmq bug

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

@Log4j2
public class 异步确认发布{
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("");
        connectionFactory.setPort(7005);
        connectionFactory.setUsername("");
        connectionFactory.setPassword("");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 开启确认发布
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
    
        channel.queueDeclare("hello", true, false, false, null);
        //  异步确认发布消息 回调
        channel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, send successful", deliveryTag);
                },
                (deliveryTag, multiple) -> {
                    log.info("消息deliveryTag=>{}, fail in send", deliveryTag);
                }
        );
        for (int i = 0; i < 5; i++) {
            String message = "Hello World!!!   " + i;
            channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

The console shows some callbacks missing

17:04:29.607 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:04:29.615 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

But I sleep for 2ms after sending the message, and all callbacks can be received

example code

for (int i = 0; i < 5; i++) {
    String message = "Hello World!!!   " + i;
    channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
    Thread.sleep(2);  // I sleep for 2ms after sending the message, and all ack callbacks can be received
}

console log

17:05:18.037 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>1, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>2, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>3, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>4, send successful
17:05:18.043 [AMQP Connection 27.11.210.232:7005] INFO me.demo.me.rabbitmq.consumer.发布确认.异步确认发布 - ackCallback, deliveryTag=>5, send successful

My RabbitMQ Server Version is 3.9.14 (No configuration has been modified. The default configuration is used), Erlang 24.3.2 ,

Maven Project dependency in

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.18.RELEASE</version>
</dependency>

I tried to prevent the main thread from shutting down, but it doesn't seem to be the reason for the main thread to shut down, because the main thread won't shut down automatically once the connection is created


Solution

  • I am not sure why you tagged this with because you are not using the spring-rabbit APIs at all; you are using the amqp-client directly.

    This is working as designed; for performance reasons, the confirm callback has the additional argument multiple when true; this means that all tags up to and including this one are confirmed with a single confirmation.

    https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

    multiple: this is a boolean value. If false, only one message is confirmed/nack-ed, if true, all messages with a lower or equal sequence number are confirmed/nack-ed.