I am able to stop the consuming and restart the consuming but the problem is that when I am restarting the consuming, I am able to process the already published message but when I publish the new messages those are not able to process.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
@Component
public class RabbitMqueue implements Consumer {
int count = 0;
@RabbitListener(queues="dataQueue")
public void receivedData(@Payload Event msg, Channel channel,
@Header(AmqpHeaders.CONSUMER_TAG) String tag) throws IOException,
InterruptedException {
count++;
System.out.println("\n Message recieved from the Dataqueue is " + msg);
//Canceling consuming working fine.
if(count == 1) {
channel.basicCancel(tag);
System.out.println("Consumer is cancle");
}
count++;
System.out.println("\n count is " + count + "\n");
Thread.sleep(5000);
//restarting consumer. able to process already consumed messages
//but not able to see the newly published messages to the queue I mean
//newly published message is moving from ready to unack state but nothing
//happening on the consumer side.
if(count == 2) {
channel.basicConsume("dataQueue", this);
System.out.println("Consumer is started ");
}
}
}
You must not do this channel.basicCancel(tag)
.
The channel/consumer are managed by Spring; the only thing you should do with the consumer argument is ack or nack messages (and even that is rarely needed - it's better to let the container do the acks).
To stop/start the consumer, use the endpoint registry as described in the documentation.
Containers created for annotations are not registered with the application context. You can obtain a collection of all containers by invoking
getListenerContainers()
on theRabbitListenerEndpointRegistry
bean. You can then iterate over this collection, for example, to stop/start all containers or invoke theLifecycle
methods on the registry itself which will invoke the operations on each container.
e.g. registry.stop()
will stop all the listeners.
You can also get a reference to an individual container using its id, using
getListenerContainer(String id)
; for exampleregistry.getListenerContainer("multi")
for the container created by the snippet above.