Search code examples
pythonsqlalchemycelerypickle

Error in reading result of Celery Task using SQLAlchemy


I have a Flask app and I'm using Celery(v5.1.2) with SQLAlchemy to run background tasks with my worker.

This is how I am creating my Celery worker...

backend_url = 'mysql://{}:{}@{}/{}'.format(username, password, hostname, db_name)
broker_url = 'sqla+' + backend_url
db_backend_url = 'db+' + backend_url

celery = Celery(
    app.import_name,
    backend=backend_url,
    broker_url=broker_url,
    result_backend=db_backend_url,
    cache_backend=db_backend_url,
    task_serializer='json',
    result_serializer='json',
    include=['my_blueprints.profile'])

It works fine to initialise and run my tasks. The problem happens when I try to read the state of my result using:

bg_task = current_app.celery.AsyncResult(MY_TASK_ID)
bg_task_state = bg_task.state

I get the ERROR: _pickle.UnpicklingError: pickle data was truncated when trying to run bg_task.state

I assume this has something to do with the fact that the task returns a large file of 1MB. While the task is running a can successfully read the task state using the two-line above, but that fails when the task is completed.

task_serializer and result_serializer are both set to 'json' so I cannot understand why this is happening.


Solution

  • In its default backend, celery uses a BLOB column to store its result in the celery_taskmeta table which is limited to 64K in mysql. Somewhere in your celery logs you're probably also seeing a truncation warning from mysql when the result is being written to that table.

    The celery result isn't really intended for passing around large files, but more just to hold some minimal details about the result of your task.

    You don't give many details about your use case, but generally writing such a large binary blob to your database is a smell, or at least a headache waiting to happen some day.

    A decent workaround would be to write the file to the file system or upload it to your favorite elastic storage area then return the file name in the task's result. The concern there would be to make sure your worker node has access to the same file system as the node that needs the result.

    You might also be able to get away with altering the celery_taskmeta table to allow a MEDIUMBLOB or LONGBLOB to be stored in the table, but my instincts are that you'll eventually wish you weren't storing the files in the RDBMS anyway. You'll have to make sure that alteration is made every time you deploy your application from scratch.