Search code examples
pythonpostgresqlsqlalchemycelery

Celery with postgresql giving error "can't adapt type 'AsyncResult'"


I am using postgresql as my backend for Celery (v5.3.5).

Celery is returning an SQL error when I call ready() on the task ASyncResult:

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'AsyncResult'
[SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback, celery_taskmeta.name AS celery_taskmeta_name, celery_taskmeta.args AS celery_taskmeta_args, celery_taskmeta.kwargs AS celery_taskmeta_kwargs, celery_taskmeta.worker AS celery_taskmeta_worker, celery_taskmeta.retries AS celery_taskmeta_retries, celery_taskmeta.queue AS celery_taskmeta_queue 
FROM celery_taskmeta 
WHERE celery_taskmeta.task_id = %(task_id_1)s]
[parameters: {'task_id_1': <AsyncResult: 0ae578c2-85b2-4d13-9002-50604329a480>}]
(Background on this error at: https://sqlalche.me/e/20/f405)

There is quite a long traceback, mostly sqlalchemy, the last Celery trace was:
  File "/dist-packages/celery/backends/database/\_\_init__.py", line 152, in _get_task_meta_for
    task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))

This is the task worker script (called task_queue.py):

from celery import Celery, current_app, Task

broker = 'sqla+postgresql://user:password@server/db'
backend = 'db+postgresql://user:password@server/db'
app : Celery = Celery('tasks', broker=broker, backend=backend)

@app.task(bind=True)
def long_running_task(self, seconds : int):
    """ A task that takes a number of seconds to complete. """
    print('Starting count off of {0} seconds'.format(seconds))
    for i in range(seconds):
        print('{0} seconds left'.format(seconds - i))
        sleep(1)

and this is how it is called:

from task_queue import long_running_task
from celery.result import AsyncResult

id = long_running_task.delay(5)
print(f"Task ID {id} queued")
task : AsyncResult = AsyncResult(id, app=long_running_task.app)

while not task.ready():
    print("Waiting for task to complete")
    sleep(1)

I cannot see what I am doing wrong here, this same code works if I use rpc as a backend. Clearly that SQL call from Celery is expecting a task id but is getting an AsyncResult instead. Is this a bug in celery? Any ideas much appreciated.

(Addendum: This same error occurs trying to get any information from the task result eg: task.name, task.result, task.args)


Solution

  • Came across the answer entirely by accident while reading comments here: How to check task status in Celery?

    Seems that calling "delay" returns an AsyncResult already, but it's not the same as the "raw" celery.result.AsyncResult. This AsyncResult returns the task_id as it's default property so it works when passed to sqlalchemy in order to get the task meta data.

    Hence the fix is simply to query the object returned from "delay" - there is no need to go get the AsyncResult:

    Changing:

    id = long_running_task.delay(5)
    print(f"Task ID {id} queued")
    task : AsyncResult = AsyncResult(id, app=long_running_task.app)
    

    to:

    task = long_running_task.delay(5)
    print(f"Task ID {task} queued, state {task.state}, task_id type {type(task)}")
    

    Works!