Search code examples
rabbitmqcelerypika

Correct config using rabbitmq as celery backend


I'm building a flask app with celery, using rabbitmq as celery's backend.

my conf for celery is

CELERY_BROKER_URL='amqp://localhost:5672',
CELERY_RESULT_BACKEND='amqp://',
CELERY_QUEUE_HA_POLICY='all',
CELERY_TASK_RESULT_EXPIRES=None

Then, declaring a queue produced a whole bunch of error

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id)

error

PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg
'durable' for queue '1419349900' in vhost '/':
received 'true' but current is 'false'

OK, I changed it to channel.queue_declare(queue=new_task_id, durable=True)

again, error

PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg
'auto_delete' for queue '1419350288' in vhost '/':
received 'true' but current is 'false'

OK, I changed it to channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)

This time error disappeared.

But how would I know it before I got these errors? I searched celery's doc on this topic, detailed documentation, but didn't get what I need——it just list all the conf items yet don't tell me how to set it. Or it's rabbitmq's doc which I should refer to?

Thank you!

edit

  1. So, all Queues declared in your configuration file, or in any registered tasks. Could you explain a bit more on this? And what's the difference between declare and create?

  2. You said Result queues will be created with 'durable', 'auto-delete' flags, where can I find this information? And How does celery know a queue is a result queue?


Solution

  • Celery default behaviour is to create all missing Queues (see CELERY_CREATE_MISSING_QUEUES Documentation. Task queues will be created by default with 'durable' flag. Result queues will be created with 'durable', 'auto-delete' flags, and 'x-expires' if your CELERY_TASK_RESULT_EXPIRES parameter is not None (by default, it is set to 1 day).

    So, all Queues declared in your configuration file, or in any registered tasks. Moreover, as you use amqp result backend, the worker, if you does not have the CELERY_IGNORE_RESULT parameter set, the result queue will be created at initialisation of the tash, and named as the task_id.

    So, if you try to redeclare this queue with a conflicting configuration, RabbitMQ will refuse it. And thus, you don't have to create it.

    Edit

    1. Queue "declaration", as indicated in the pika documentation, allow to check the existence of a Queue in RabbitMQ, and, if not, create it. If CELERY_CREATE_MISSING_QUEUESis set to True in your Celery configuration, at initialization, any queue listed in CELERY_QUEUES or CELERY_DEFAULT_QUEUE parameter, or any custom queue declared in registered tasks options, such as @task(name="custom", queue="my_custom_queue"), or even in a custom CELERY_ROUTING definition, will be "declared" to RabbitMQ, thus will be created if they don't exist.

    2. Queue parametrization documentation can be found here, in the Using Transient Queues paragraph, but the best way to see it is to use the RabbitMQ management plugin, allowing you to monitor in a Web UI the declared queues and their configuration (you can see a D flag for Durable, and a A-D flag for Auto-Delete). Finally, celery doesn't "know" if a queue is a result queue, but, when created, a task is assigned to an unique identifier. This identifier will be used a the queue name for any result. It means that if the producer of the task waits for a result, it will listen to that queue, whenever she will be created. And the consumer, once the task aknowledged, and before the task is really executed, and if the task does not ignore results (througt setting CELERY_IGNORE_RESULT or task custom options), will check if a queue named as the task identifier exists, if not, it will create it with the default Result configuration (see the Result Backend configuration)