Search code examples
spring-bootspring-rabbit

How to stop and restart consuming message from the RabbitMQ with @RabbitListener


I am able to stop the consuming and restart the consuming but the problem is that when I am restarting the consuming, I am able to process the already published message but when I publish the new messages those are not able to process.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;

@Component
public class RabbitMqueue implements Consumer {

int count = 0;

@RabbitListener(queues="dataQueue")
public void receivedData(@Payload Event msg, Channel channel, 
          @Header(AmqpHeaders.CONSUMER_TAG) String tag) throws IOException, 
                InterruptedException {

count++;
System.out.println("\n Message recieved from the Dataqueue is " + msg);

//Canceling consuming working fine.
if(count == 1) {
    channel.basicCancel(tag);
    System.out.println("Consumer is cancle");
}

count++;
System.out.println("\n count is " + count + "\n");

Thread.sleep(5000);

//restarting consumer. able to process already consumed messages
//but not able to see the newly published messages to the queue I mean
//newly published message is moving from ready to unack state but nothing 
//happening on the consumer side. 

if(count == 2) {
    channel.basicConsume("dataQueue", this);
            System.out.println("Consumer is started "); 
    }       
  }
}

Solution

  • You must not do this channel.basicCancel(tag).

    The channel/consumer are managed by Spring; the only thing you should do with the consumer argument is ack or nack messages (and even that is rarely needed - it's better to let the container do the acks).

    To stop/start the consumer, use the endpoint registry as described in the documentation.

    Containers created for annotations are not registered with the application context. You can obtain a collection of all containers by invoking getListenerContainers() on the RabbitListenerEndpointRegistry bean. You can then iterate over this collection, for example, to stop/start all containers or invoke the Lifecycle methods on the registry itself which will invoke the operations on each container.

    e.g. registry.stop() will stop all the listeners.

    You can also get a reference to an individual container using its id, using getListenerContainer(String id); for example registry.getListenerContainer("multi") for the container created by the snippet above.