Search code examples
pythonflasksqlalchemycelery

Database is not updated in Celery task with Flask and SQLAlchemy


I'm writing web application with Flask and SQLAlchemy. My program needs to process some stuff in the background and then mark this stuff as processed in the database. Using standard Flask/Celery example, I have something like this:

from flask import Flask
from celery import Celery

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery


app = Flask(__name__)

celery = make_celery(app)

class Stuff(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    processed = db.Column(db.Boolean)


@celery.task()
def process_stuff(stuff):
    # process stuff here

    stuff.processed = True
    db.session.commit()

@app.route("/process_stuff/<id>")
def do_process_stuff(id):
    stuff = Stuff.query.get_or_404(id)
    process_stuff.delay(stuff)
    return redirect(url_for("now_wait"))

I can access my database from process_stuff (e.g. submit queries like Stuff.query.get(some_id) work), but db.session.commit() do nothing: my stuff record is not updated. According to Celery worker log, commit occures but nothing changes in the database. Is there something wrong with my db.session.commit()? Is it possible to make such commit somehow?


Solution

  • Okay, I got it. stuff passed to process_stuff() is not attached to db.session. I have to make explicit request in process_stuff() to get the right stuff object like this:

    @celery.task()
    def process_stuff(stuff):
        # process stuff here
    
        my_stuff = Stuff.query.get(stuff.id)
    
        my_stuff.processed = True
        db.session.commit()
    

    Now it works.