Search code examples
pythoncelery

How to determine which queues a Celery worker is consuming at runtime?


As part of a sanity check, I want to write some code to make sure the worker has started with a correct set of queues based on the settings given.

Celery is run like so:

celery worker -A my_app -l INFO -Q awesome_mode

I would like to work out after the app is initialised, which queues Celery is consuming.

e.g., I made up app.queues:

app = Celery('my_app')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

if 'awesome_mode' in app.queues:
    ...

Solution

  • After a bit of interactive debugging I found app.amqp.queues which is a dictionary where the key is the name of the queue and the value is a Queue.

    Unfortunately the dictionary is not populated immediately after initialisation, but does work after the worker_ready signal.

    Placing this code after app initialisation seems to work. It could probably be placed elsewhere of course.

    @worker_ready.connect
    def worker_ready_handler(sender=None, **kwargs):
        print(app.amqp.queues.keys())
    

    The worker logs:

    [2015-04-22 07:41:01,147: WARNING/MainProcess] ['celery', 'awesome_mode']
    [2015-04-22 07:41:01,148: WARNING/MainProcess] celery@zaptop ready.