Search code examples
pythoncelerycelery-task

How do I override the backend for celery tasks


we're using Redis as our result backend. However for one task we'd like to override this to use RabbitMQ instead.

The documentation for Task.backend says:

The result store backend to use for this task. Defaults to the CELERY_RESULT_BACKEND setting

So I had assumed that we could set Task.backend to a string of the same format accepted by CELERY_RESULT_BACKEND.

So I try this:

celeryconfig.py

CELERY_RESULT_BACKEND = "redis://redis-host:7777"

tasks.py

@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
    ...

However the worker fails with :

[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
    self.after_fork()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
    self.initializer(*self.initargs)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
    store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'

Solution

  • The documentation is incorrect. Task.backend is actually an instance of a backend class from celery.backends. In this case to override the task class I had to do this:

    from celery.backends.amqp import AMQPBackend
    
    @app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
    def my_task(params):
        ...
    

    However the workers continue to use the default class and don't seem to offer a way to override this.