Search code examples
javaqueuerabbitmqmessage-queueamqp

RabbitMQ delete queue can not release connection


on my java program, some kind of messages are being sent over RabbitMQ queues as below :

 if(!con.isConnected()){
        log.error("Not connected !!!");

        return false;
}
con.getChannel().basicPublish("",queueName, MessageProperties.PERSISTENT_BASIC, bytes)
  1. I deleted queues via RabbitMQ management GUI plugin
  2. try to send a message over that deleted queue

Result: queues were deleted from RabbitMQ GUI but when I am trying to send message over that deleted RabbitMQ queues, connection is still alive.(con.isConnected() == true ) I need to find a way to listen the queue, if it is deleted , I shouldn't send any message to the deleted queue.

Note: After deleting queue, I am not restarting RabbitMQ.

channel creation :

 channel = connection.createChannel();
 channel.queueDeclare(prop.getQueueName(), true, false, false, null);

example code channel, queue,exchange creation :

ConnectionFactory cf = new ConnectionFactory();
    cf.setUsername("guest");
    cf.setPassword("guest");
    cf.setHost("localhost");
    cf.setPort(5672);
    cf.setAutomaticRecoveryEnabled(true);
    cf.setConnectionTimeout(10000);
    cf.setNetworkRecoveryInterval(10000);
    cf.setTopologyRecoveryEnabled(true);
    cf.setRequestedHeartbeat(5);
    Connection connection = cf.newConnection();

    channel = connection.createChannel();
    channel.queueDeclare("test", true, false, false, null);
    channel.exchangeDeclare("testExchange", "direct",true);
    channel.queueBind("test", "testExchange", "testRoutingKey");

    connection.addShutdownListener(new ShutdownListener() {

        @Override
        public void shutdownCompleted(ShutdownSignalException cause) {
            System.out.println("test"+cause);
        }
    });

Sending message :

            channel.basicPublish("testExchange", "testRoutingKey", null,messageBodyBytes);

Solution

  • From RabbitMQ google

    Messages in AMQP 0-9-1 are not published to queues; they are published to exchanges, from where they are routed to a queue (or another exchange) or not. [1] basic.publish is a completely asynchronous protocol method by design: there is no response for it unless you ask for it [2]. Messages that are unroutable can be returned to the publisher if you define a return listener and publish with the mandatory flag set to true. Note that publisher confirms and the mandatory flag/returns are orthogonal and one does not imply the other.

    Defining return listener and setting mandatory flag true was solved my problem. If any message was not routed , I can catch them by using ReturnListener and add to my persisted queue to send another time when system becomes active.