Search code examples
pythonpython-multithreadingfastapiconnection-pool

How to avoid database connection pool from being exhausted when using FastAPI in threaded mode ( with `def` instead of `async def`)


I use FastAPI for a production application that uses asyncio almost entirely except when hitting the database. The database still relies on synchronous SQLAlchemy as the async version was still in alpha (or early beta) at the time.

While our services do end up making synchronous blocking calls when it hits the database it's still wrapped in async functions. We do run multiple workers and several instances of the app to ensure we don't hit serious bottlenecks.

Concurrency with Threads

I understand that FastAPI offers concurrency using threads when using the def controller_method approach but I can't seem to find any details around how it controls the environment. Could somebody help me understand how to control the maximum threads a process can generate. What if it hits system limits?

Database connections

When I use the async await model I create database connection objects in the middleware which is injected into the controller actions.

@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
   
    await _set_request_id()

    try:
        request.state.db = get_sessionmaker(scope_func=None)
        response = await call_next(request)
    finally:
        if request.state.db.is_active:
            request.state.db.close()
    return response

When it's done via threads is the controller already getting called in a separate thread, ensuring a separate connection for each request?

Now if I can't limit the number of threads that are being spawned by the main process, if my application gets a sudden surge of requests won't it overshoot the database connection pool limit and eventually blocking my application?

Is there a central threadpool used by FastAPI that I can configure or is this controlled by Uvicorn?

Uvicorn

I see that Uvicorn has a configuration that let's it limit the concurrency using the --limit-concurrency 60 flag. Is this governing the number of concurrent threads created in the threaded mode?

If so, should this always be a lower than my connection pool ( connection pool + max_overflow=40)

So in the scenario, where I'm allowing a uvicorn concurrency limit of 60 my db connection pool configurations should be something like this?

engine = sqlalchemy.create_engine(
    cfg("DB_URL"), 
    pool_size=40, 
    max_overflow=20, 
    echo=False, 
    pool_use_lifo=False,
    pool_recycle=120
)

Is there a central threadpool that is being used in this case? Are there any sample projects that I can look at to see how this could be configured when deployed at scale.

I've used Netflix Dispatch as a reference but if there are other projects I'd definitely want to look at those.


Solution

  • Fastapi uses Starlette as an underlying framework. Starlette provides a mechanism for starting def path operations in the thread pool for which it uses anyio. Therefore, we can limit the number of threads which can be executed simultaneously by setting property total_tokens of anyio's CapacityLimiter.

    Example below:

    import threading
    import anyio
    import uvicorn
    from fastapi import FastAPI
    import time
    import logging
    
    THREADS_LIMIT = 5
    
    logging.basicConfig(level=logging.DEBUG)
    app = FastAPI()
    
    
    class Counter(object):
        def __init__(self):
            self._value = 0
            self._lock = threading.Lock()
    
        def increment(self):
            with self._lock:
                self._value += 1
    
        def decrement(self):
            with self._lock:
                self._value -= 1
    
        def value(self):
            with self._lock:
                return self._value
    
    
    counter = Counter()
    
    
    @app.get("/start_task")
    def start_task():
        counter.increment()
        logging.info("Route started. Counter: %d", counter.value())
        time.sleep(10)
        counter.decrement()
        logging.info("Route stopped. Counter: %d", counter.value())
        return "Task done"
    
    
    @app.on_event("startup")
    async def startup_event():
        limiter = anyio.to_thread.current_default_thread_limiter()
        limiter.total_tokens = THREADS_LIMIT
    
    
    if __name__ == "__main__":
        uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
    
    

    Try to open 50 connections in parallel:

    seq 1 50 | xargs -n1 -P50  curl "http://localhost:8000/start_task" 
    

    We see that the number of simultaneously processed requests is limited to 5.

    Output:

    INFO:root:Route started. Counter: 1
    INFO:root:Route started. Counter: 2
    INFO:root:Route started. Counter: 3
    INFO:root:Route started. Counter: 4
    INFO:root:Route started. Counter: 5
    INFO:root:Route stopped. Counter: 4
    INFO:uvicorn.access:127.0.0.1:60830 - "GET /start_task HTTP/1.1" 200
    INFO:root:Route stopped. Counter: 3
    INFO:root:Route started. Counter: 4
    INFO:uvicorn.access:127.0.0.1:60832 - "GET /start_task HTTP/1.1" 200
    INFO:root:Route started. Counter: 5
    ...