Search code examples
fastapijobs

Is there a way to run a single job instance across multiple workers in fastapi


I am using apscheduler to create background jobs to run in my fastapi application, however when I start the application with more than one worker the job is duplicated across the workers resulting in all kinds of error and potentially security flaws.

here is a simplified example


from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import BackgroundTasks, FastAPI

def some_job():
    print ("hello world")

app = FastAPI()
shed = AsyncIOScheduler()

shed.add_job(some_job, "interval", seconds=5)


@app.get("/test")
async def test():
    return {"Hello":"world"}

shed.start()

and to run it use uvicorn main:app --workers 4

this will result in hello world being printed 4 times every time the job is triggered.

Is there a way to call one instances across all the jobs (on the parent process level)?

I research some solutions online but most of them use celery and similar modules or lock files and memory locations but both options are too complicated and I prefer to use the minimum number of modules


Solution

  • you could run the scheduler in the parent process and the fastapi in a sperate child process by using python's multiprocessing

    import multiprocessing
    from apscheduler.schedulers.background import BackgroundScheduler
    import uvicorn
    from fastapi import FastAPI
    
    app = FastAPI()
    @app.get("/")
    async def home():
        return {"Hello":"world"}
    
    def regular_function():
        print("Hello world")
    
    def run_app():
        uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)
    
    def main():
        app_process = multiprocessing.Process(target=run_app)
        app_process.start()
    
        scheduler = BackgroundScheduler()
        scheduler.add_job(regular_function, 'interval', seconds=5, max_instances=1) 
        scheduler.start()
    
        app_process.join()
    
    if __name__ == "__main__":
        main()