As part of a sanity check, I want to write some code to make sure the worker has started with a correct set of queues based on the settings given.
Celery is run like so:
celery worker -A my_app -l INFO -Q awesome_mode
I would like to work out after the app is initialised, which queues Celery is consuming.
e.g., I made up app.queues
:
app = Celery('my_app')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
if 'awesome_mode' in app.queues:
...
After a bit of interactive debugging I found app.amqp.queues
which is a dictionary where the key is the name of the queue and the value is a Queue
.
Unfortunately the dictionary is not populated immediately after initialisation, but does work after the worker_ready
signal.
Placing this code after app initialisation seems to work. It could probably be placed elsewhere of course.
@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
print(app.amqp.queues.keys())
The worker logs:
[2015-04-22 07:41:01,147: WARNING/MainProcess] ['celery', 'awesome_mode']
[2015-04-22 07:41:01,148: WARNING/MainProcess] celery@zaptop ready.