I am using apscheduler to create background jobs to run in my fastapi application, however when I start the application with more than one worker the job is duplicated across the workers resulting in all kinds of error and potentially security flaws.
here is a simplified example
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import BackgroundTasks, FastAPI
def some_job():
print ("hello world")
app = FastAPI()
shed = AsyncIOScheduler()
shed.add_job(some_job, "interval", seconds=5)
@app.get("/test")
async def test():
return {"Hello":"world"}
shed.start()
and to run it use uvicorn main:app --workers 4
this will result in hello world
being printed 4 times every time the job is triggered.
Is there a way to call one instances across all the jobs (on the parent process level)?
I research some solutions online but most of them use celery and similar modules or lock files and memory locations but both options are too complicated and I prefer to use the minimum number of modules
you could run the scheduler in the parent process and the fastapi in a sperate child process by using python's multiprocessing
import multiprocessing
from apscheduler.schedulers.background import BackgroundScheduler
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def home():
return {"Hello":"world"}
def regular_function():
print("Hello world")
def run_app():
uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)
def main():
app_process = multiprocessing.Process(target=run_app)
app_process.start()
scheduler = BackgroundScheduler()
scheduler.add_job(regular_function, 'interval', seconds=5, max_instances=1)
scheduler.start()
app_process.join()
if __name__ == "__main__":
main()