I tried to add celery to my existing flask project. After adding, I got an "working outside of application context" error while running. It seems that the celery worker lacks of my application context. But I am not sure where to pass the applicaiton context to celery worker in this case.
Here is my current structure (I tried to follow a factory pattern with blueprints and api documentions):
-run.py
-app
-module1
-controller.py
-model.py
-service.py
-__init__.py
-config.py
For the init.py
# __init__.py
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from app.config import Config
from flask_restplus import Api
from celery import Celery
cors = CORS()
db = SQLAlchemy()
api = Api()
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, include=["app.module1.service"])
def create_app(config_class = Config):
app = Flask(__name__, static_url_path='')
app.config.from_object(Config)
cors.init_app(app)
db.init_app(app)
api.init_app(app=app)
celery.conf.update(app.config)
from app.module1.controller import blueprint
from app.module1.controller import ns
app.register_blueprint(blueprint)
api.add_namespace(ns)
return app
For the run.py
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(threaded=True, debug=True)
For the service.py
from app import db, celery
@celery.task(bind=True)
def service1(self):
# do somethigng & return
For the controller.py
from flask import Blueprint
from flask_restplus import Api, Resouce
blueprint = Blueprint('service', __name__)
apis = Api(app = blueprint)
ns = apis.namespace('service', 'service description')
@ns.route("/")
class SomeList(Resource):
def get(self):
service1.apply_async()
# return
I think the confusion is based on the fact that you are trying to "pass" an application context to the Celery worker. In reality the Flask process cannot pass a context to the worker because they are different processes. The Celery worker process needs to create its own Flask application instance by calling create_app()
so that it can push its own app contexts when needed.
So for example, in your service1
task:
from app import db, celery, create_app
@celery.task(bind=True)
def service1(self):
app = create_app()
with app.app_context():
# do somethigng & return
To make this a bit more efficient, you can create a single global app
that is shared by all your tasks:
from app import db, celery, create_app
app = create_app()
@celery.task(bind=True)
def service1(self):
with app.app_context():
# do somethigng & return