Search code examples
pythondjangosqlalchemyfastapiconnection-pooling

How are connections handled by SQLAlchemy internally


Context

Hello, I mostly use Django for my main backend monolith. In Django, you do not need to handle db connections on your own, this is abstracted (because Django uses the Active Record pattern instead of the Data mapper pattern used by Sqlalchemy).

I have been doing some reading and I feel like I know the basics of the different constructs of Sqlalchemy (like engine, connection, and session). Nonetheless, we have started to scale our FastAPI + Sqlalchemy apps, but I have found this error appearing:

sqlalchemy.exc.TimeoutError - anyio/streams/memory.py:92 - QueuePool limit of size 8 overflow 4 reached, connection timed out, timeout 10.00

I would like to understand why that is happening.

Current setup

Right now we have instances of the web server running, using the following command:

python -m uvicorn somemodule.api.fast_api.main:app --host 0.0.0.0

As you can see, I'm not setting the workers flag, so uvicorn is only using 1 worker. On the SQLAlchemy engine, we are using the following options:

SQLALCHEMY_ENGINE_OPTIONS = {
    "pool_pre_ping": True,
    "pool_size": 16,
    "max_overflow": 4,
    "pool_timeout": 10,
    "pool_recycle": 300,
}

engine = create_engine(get_db_url(), **SQLALCHEMY_ENGINE_OPTIONS)

Questions

Q1: does this apply to every worker of uvicorn?

My guess is that if I spin two workers in uvicorn, then the connection pool is not shared between the workers, so if the pool size is set to 16, and I have two workers, then the maximum number of connections (without considering the max overflow) would 32. Am I correct?

Q2: how does the connections play with async?

Right now I'm doing the usual thing of creating a session like:

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


DBSessionDependency = Annotated[Session, Depends(get_db)]

From my current understanding, the session uses a connection from the pool. We open a new session at the beginning of the request and then it gets closed at the end of the request, so that in every request we get a clean state session (but recycling connections from the pool).

Now in an async fast API, we can handle way more requests concurrently, so I guess the pool size is a bottleneck, and maybe because of that, we are running out of available connections in the pool. Is this correct? My train of thought is: uvicorn can accept at the same time, let's say, 200 requests, those requests start being processed so that every request gets assigned a session with an attached connection from the pool.

So I guess that to handle a lot of requests in Async Fast API I also need to have a way bigger pool size (or use pgbouncer and have less connections between all of the workers?)

Q3: is it really an advantage to set a new session for every request?

We are using the repository pattern, so the access to the db of every model is centralized in a class. Would it not be just enough to create a session from inside the class and that's it? Why do I need to bother with dependency injection in Fast API and passing the session around or using something like context vars to share the session object with the repositories?

Let me know if you need any clarification and thanks in advance!! :D


Solution

  • There is a very handy setting you can configure to spit out pool events to the log that you can use to really see what is happening: logging-reset-on-return-events

    Also you can turn on echo=True on the engine itself to see the regular queries and transaction handling logged as well. (THERE IS A LOT OF TEXT so this is usually only helpful if you are managing the requests going through)

    sqlalchemy.create_engine.params.echo

    I'm not an expert on this area especially async but I think I can mostly answer some of these questions.

    Q1:

    • By default a pool doesn't actually create connections until it is asked for one. So if you spin up 10 workers there are 0 connections until inside a worker someone asks the pool for one.
    • Otherwise as I understand it, 32 is correct for 2 workers with a pool sized 16.

    Q2:

    Again, as I understand it, when you create a new Session not much happens (including creation of a connection) until you use it, session.execute(), session.flush(), etc. So there shouldn't be a huge cost in making a Session for every request even if you don't use it at all.

    Regarding async you probably are going to hit trade offs just like you would with threads. The database itself usually has a set limit of connections. To increase that limit the database sets aside more resources, just like increasing the web workers requires more RAM and CPU allocation. I guess in this case after a certain limit you'd need pg_bouncer to start passing connections between servers and dealing with all those complexities. I guess this all depends on what is "cheap" in this situation. Maybe it is better to add more caching to avoid hitting the database so much?

    Also not every "thread" or "task" will likely be using a connection that instant but I guess that also depends on your app. Some will be going back to the pool and some will be coming from the pool, etc.

    Q3:

    Usually a session uses a transaction and you want those to start and end as fast as possible but also be coherent for rollbacks. Ie. you don't want a new customer created but their order discarded (rolled back) in the same session. Depending on how you have transaction configured and what db you are using, you also don't want to have a bunch of rollbacks or timeouts because different threads/tasks are trying to manipulate the same data at the same time or are holding a transaction open for a long time so you want the transactions to be as short as possible.

    Also the session provides some caching but you don't want to keep things in the session too long because then they get stale. So again there is this trade off. Usually running a session and usually a single transaction through a single "web request" is adequate. You can SELECT a user account and use it to do some work and then commit that work or just display some result if you were only making read only queries. Then everything can be thrown out of the "cache" and you start over. While you are doing that though you don't want other "web requests" messing up your state in the session while you are trying to work with it. So it is a pretty good fit, 1 session per 1 request.

    Regarding where does the session stuff go / ie. why not put it in the db class. The short answer is keep the session management separate from the data manipulation. This recommendation is discusses here towards the bottom: when-do-i-construct-a-session-when-do-i-commit-it-and-when-do-i-close-it

    Here is some test code to see how the default Session works. I use logging to generate the output and set the level in the code instead of using the shortcut, echo provided by sqlalchemy. Trip is just a simple orm class mapped to a table named trips.

    
    # Engine is defined like this.
    def get_engine(env):
        return create_engine(f"postgresql+psycopg2://{env['DB_USER']}:{env['DB_PASSWORD']}@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}")
    
    #...
    
    class Trip(Base):
        __tablename__ = 'trips'
        id = Column(BigInteger, primary_key=True)
    
    #...
    
    def query(engine):
        import logging
        FORMAT = '| %(name)s | %(message)s |'
        logging.basicConfig(format=FORMAT)
    
        # Configure sqlalchemy loggers and ourselves to emit DEBUG and above statements.
        # Similar to using echo='debug' and pool_echo='debug'.
        for logger_name in ('sqlalchemy.engine', 'sqlalchemy.pool', __name__):
            logging.getLogger(logger_name).setLevel(logging.DEBUG)
    
        # Use our logger now to highlight locations in code.
        logger = logging.getLogger(__name__)
    
        print('|name|message|')
        print('|----|-------|')
        logger.debug('starting flush() block')
        with Session(engine) as session:
            logger.debug(('inner - before add', f'in_transaction={session.in_transaction()}'))
            session.add(Trip())
            logger.debug(('inner - before flush', f'in_transaction={session.in_transaction()}'))
            session.flush()
            logger.debug(('inner - before commit', f'in_transaction={session.in_transaction()}'))
            session.commit()
            logger.debug(('inner - before end of block', f'in_transaction={session.in_transaction()}'))
    
        logger.debug('starting execute() block')
        with Session(engine) as session:
            logger.debug(('inner - before select', f'in_transaction={session.in_transaction()}'))
            trips = session.scalars(select(Trip)).all()
            logger.debug(('inner - before end of block', f'in_transaction={session.in_transaction()}'))
    
        logger.debug('starting empty block')
        with Session(engine) as session:
            logger.debug('empty block')
    
        logger.debug('starting add() only block')
        with Session(engine) as session:
            logger.debug(('inner - before add', f'in_transaction={session.in_transaction()}'))
            session.add(Trip())
            logger.debug(('inner - before end of block', f'in_transaction={session.in_transaction()}'))
    
        logger.debug('starting begin block')
        with Session(engine) as session:
            logging.debug(('inner - before begin()', f'in_transaction={session.in_transaction()}'))
            with session.begin():
                logger.debug(('inner inner - before add', f'in_transaction={session.in_transaction()}'))
                session.add(Trip())
                logger.debug(('inner inner - before end of block', f'in_transaction={session.in_transaction()}'))
            logger.debug(('inner - before end of block', f'in_transaction={session.in_transaction()}'))
    
        logger.debug('finished')
    
    name message
    main starting flush() block
    main ('inner - before add', 'in_transaction=False')
    main ('inner - before flush', 'in_transaction=True')
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> checked out from pool
    sqlalchemy.engine.Engine BEGIN (implicit)
    sqlalchemy.engine.Engine INSERT INTO trips DEFAULT VALUES RETURNING trips.id
    sqlalchemy.engine.Engine [generated in 0.00008s] {}
    sqlalchemy.engine.Engine Col ('id',)
    sqlalchemy.engine.Engine Row (1,)
    main ('inner - before commit', 'in_transaction=True')
    sqlalchemy.engine.Engine COMMIT
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> being returned to pool
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> rollback-on-return
    main ('inner - before end of block', 'in_transaction=False')
    main starting execute() block
    main ('inner - before select', 'in_transaction=False')
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> checked out from pool
    sqlalchemy.engine.Engine BEGIN (implicit)
    sqlalchemy.engine.Engine SELECT trips.id
    FROM trips
    sqlalchemy.engine.Engine [generated in 0.00006s] {}
    sqlalchemy.engine.Engine Col ('id',)
    sqlalchemy.engine.Engine Row (1,)
    main ('inner - before end of block', 'in_transaction=True')
    sqlalchemy.engine.Engine ROLLBACK
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> being returned to pool
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> rollback-on-return
    main starting empty block
    main empty block
    main starting add() only block
    main ('inner - before add', 'in_transaction=False')
    main ('inner - before end of block', 'in_transaction=True')
    main starting begin block
    main ('inner inner - before add', 'in_transaction=True')
    main ('inner inner - before end of block', 'in_transaction=True')
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> checked out from pool
    sqlalchemy.engine.Engine BEGIN (implicit)
    sqlalchemy.engine.Engine INSERT INTO trips DEFAULT VALUES RETURNING trips.id
    sqlalchemy.engine.Engine [cached since 0.004129s ago] {}
    sqlalchemy.engine.Engine Col ('id',)
    sqlalchemy.engine.Engine Row (2,)
    sqlalchemy.engine.Engine COMMIT
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> being returned to pool
    sqlalchemy.pool.impl.QueuePool Connection <connection object at 0x7f34b457cf40; dsn: 'user=so_testing password=xxx dbname=so_testing host=so_testing_db port=5432', closed: 0> rollback-on-return
    main ('inner - before end of block', 'in_transaction=False')
    main finished