In Pika or RabbitMQ, How do I check if any consumers are currently consuming?

I would like to check if a Consumer/Worker is present to consume a Message I am about to send.

If there isn't any Worker, I would start some workers (both consumers and publishers are on a single machine) and then go about publishing Messages.

If there is a function like connection.check_if_has_consumers, I would implement it somewhat like this -

import pika
import workers

# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel =

# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
    # start the workers in other processes, using python's `multiprocessing`

# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",

But I am unable to find any function with check_if_has_consumers functionality in pika.

Is there some way of accomplishing this, using pika? or maybe, by talking to The Rabbit directly?

I am not completely sure, but I really think RabbitMQ would be aware of the number of consumers subscribed to different queues, since it does dispatch messages to them and accepts acks

I just got started with RabbitMQ 3 hours ago... any help is welcome...

here is the code I wrote, if its any help....

import multiprocessing
import pika

def start_workers(num=3):
    """start workers as non-daemon processes"""
    for i in xrange(num):    
        process = WorkerProcess()

class WorkerProcess(multiprocessing.Process):
    worker process that waits infinitly for task msgs and calls
    the `callback` whenever it gets a msg
    def __init__(self):
        self.stop_working = multiprocessing.Event()

    def run(self):
        worker method, open a channel through a pika connection and
        start consuming
        connection = pika.BlockingConnection(
        channel =
        channel.queue_declare(queue='worker_queue', auto_delete=False,

        # don't give work to one worker guy until he's finished
        channel.basic_consume(callback, queue='worker_queue')

        # do what `channel.start_consuming()` does but with stopping signal
        while len(channel._consumers) and not self.stop_working.is_set():

        return 0

    def signal_exit(self):
        """exit when finished with current loop"""

    def exit(self):
        """exit worker, blocks until worker is finished and dead"""
        while self.is_alive(): # checking `is_alive()` on zombies kills them

    def kill(self):
        """kill now! should not use this, might create problems"""

def callback(channel, method, properties, body):
    """pika basic consume callback"""
    print 'GOT:', body
    # do some heavy lifting here
    result = save_to_database(body)
    print 'DONE:', result


I have to move forward so here is a workaround that I am going to take, unless a better approach comes along,

So, RabbitMQ has these HTTP management apis, they work after you have turned on the management plugin and at middle of HTTP apis page there is

/api/connections - A list of all open connections.

/api/connections/name - An individual connection. DELETEing it will close the connection.

So, if I connect my Workers and my Produces both by different Connection names / users, I'll be able to check if the Worker Connection is open... (there might be issues when worker dies...)

will be waiting for a better solution...


just found this in the rabbitmq docs, but this would be hacky to do in python:

shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue    0

so i could do something like,"echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")

hacky... still hope pika has some python function to do this...



  • I was just looking into this as well. After reading through the source and docs I came across the following in

    def consumer_tags(self):
        """Property method that returns a list of currently active consumers
        :rtype: list
        return self._consumers.keys()

    My own testing was successful. I used the following where my channel object is self._channel:

    if len(self._channel.consumer_tags) == 0:
  "Nobody is listening.  I'll come back in a couple of minutes.")