I have an app using Uvicorn
with FastAPI
. I have also some connections open (e.g. to MongoDB
). I want to gracefully close these connections once some signal occurs (SIGINT
, SIGTERM
and SIGKILL
).
My server.py
file:
import uvicorn
import fastapi
import signal
import asyncio
from source.gql import gql
app = fastapi.FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
app.mount("/graphql", gql)
# handle signals
HANDLED_SIGNALS = (
signal.SIGINT,
signal.SIGTERM
)
loop = asyncio.get_event_loop()
for sig in HANDLED_SIGNALS:
loop.add_signal_handler(sig, _some_callback_func)
if __name__ == "__main__":
uvicorn.run(app, port=6900)
Unfortunately, the way I try to achieve this is not working. When I try to Ctrl+C
in terminal, nothing happens. I believe it is caused because Uvicorn
is started in different thread...
What is the correct way of doing this? I have noticed uvicorn.Server.install_signal_handlers()
function, but wasn't lucky in using it...
FastAPI allows defining event handlers (functions) that need to be executed before the application starts up, or when the application is shutting down. Thus, you could use the shutdown
event, as described here:
@app.on_event("shutdown")
def shutdown_event():
# close connections here
Since startup
and shutdown
events are now deprecated (and might be removed in the future), one could use a lifespan
function instead. Examples and details can be found in this answer, as well as here, here and here.