we're using Redis as our result backend. However for one task we'd like to override this to use RabbitMQ instead.
The documentation for Task.backend says:
The result store backend to use for this task. Defaults to the CELERY_RESULT_BACKEND setting
So I had assumed that we could set Task.backend
to a string of the same format accepted by CELERY_RESULT_BACKEND
.
So I try this:
celeryconfig.py
CELERY_RESULT_BACKEND = "redis://redis-host:7777"
tasks.py
@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
...
However the worker fails with :
[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
self.run()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
self.after_fork()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
self.initializer(*self.initargs)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
app=app)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'
The documentation is incorrect. Task.backend
is actually an instance of a backend class from celery.backends
. In this case to override the task class I had to do this:
from celery.backends.amqp import AMQPBackend
@app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
def my_task(params):
...
However the workers continue to use the default class and don't seem to offer a way to override this.