Search code examples
pythonflaskceleryfactory-pattern

Pass application context to celery in flask framework


I tried to add celery to my existing flask project. After adding, I got an "working outside of application context" error while running. It seems that the celery worker lacks of my application context. But I am not sure where to pass the applicaiton context to celery worker in this case.

Here is my current structure (I tried to follow a factory pattern with blueprints and api documentions):

-run.py
-app
    -module1
      -controller.py
      -model.py
      -service.py
    -__init__.py
    -config.py

For the init.py

# __init__.py
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from app.config import Config
from flask_restplus import Api
from celery import Celery

cors = CORS()
db = SQLAlchemy()
api = Api()
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, include=["app.module1.service"])

def create_app(config_class = Config):
    app = Flask(__name__, static_url_path='')
    app.config.from_object(Config)

    cors.init_app(app)
    db.init_app(app)
    api.init_app(app=app)
    celery.conf.update(app.config)

    from app.module1.controller import blueprint
    from app.module1.controller import ns

    app.register_blueprint(blueprint)
    api.add_namespace(ns)

    return app

For the run.py

from app import create_app
app = create_app()
if __name__ == '__main__':
    app.run(threaded=True, debug=True)

For the service.py

from app import db, celery
@celery.task(bind=True)
def service1(self):
    # do somethigng & return

For the controller.py

from flask import Blueprint
from flask_restplus import Api, Resouce
blueprint = Blueprint('service', __name__)
apis = Api(app = blueprint)
ns = apis.namespace('service', 'service description')
@ns.route("/")
class SomeList(Resource):
    def get(self):
        service1.apply_async()
        # return

Solution

  • I think the confusion is based on the fact that you are trying to "pass" an application context to the Celery worker. In reality the Flask process cannot pass a context to the worker because they are different processes. The Celery worker process needs to create its own Flask application instance by calling create_app() so that it can push its own app contexts when needed.

    So for example, in your service1 task:

    from app import db, celery, create_app
    
    @celery.task(bind=True)
    def service1(self):
        app = create_app()
        with app.app_context():
            # do somethigng & return
    

    To make this a bit more efficient, you can create a single global app that is shared by all your tasks:

    from app import db, celery, create_app
    app = create_app()
    
    @celery.task(bind=True)
    def service1(self):
        with app.app_context():
            # do somethigng & return