Search code examples
pythonpostgresqlgoogle-cloud-platformsqlalchemygoogle-cloud-sql

SQLAlchemy and Google Cloud Sql session pool


I am creating an async fastAPI microservice using graphql(strawberry). My database is hosted on GoogleCloudSQL and it is a postgres database. My microservice is working really good on local with local db but now when I built my connector to the GoogleCloudSQL it doesn't work that well anymore. My question is, how can I create a session pool and yeild session to each of my requests ?

Here are some code snippets:

`
# [START cloud_sql_postgres_sqlalchemy_connect_connector]
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from google.cloud.sql.connector import IPTypes, create_async_connector
import asyncpg


async def connect_with_connector() -> AsyncEngine:
    instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]  # e.g.         'project:region:instance'
    db_user = os.environ["DB_USER"]  # e.g. 'my-db-user'
    db_pass = os.environ["DB_PASS"]  # e.g. 'my-db-password'
    db_name = os.environ["DB_NAME"]  # e.g. 'my-database'

    ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC

    # initialize Cloud SQL Python Connector object
    connector = await create_async_connector()

    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect_async(
            instance_connection_name,
            "asyncpg",
            user=db_user,
            password=db_pass,
            db=db_name,
            ip_type=IPTypes.PUBLIC
        )

        return conn

    # The Cloud SQL Python Connector can be used with SQLAlchemy
    # using the 'creator' argument to 'create_engine'

    connection = await getconn()

    pool = create_async_engine(
        "postgresql+asyncpg://",
        creator=connection,
        # [START_EXCLUDE]
        # Pool size is the maximum number of permanent connections to keep.
        pool_size=5,

        # Temporarily exceeds the set pool_size if no connections are available.
        max_overflow=2,

        # The total number of concurrent connections for your application will be
        # a total of pool_size and max_overflow.

        # 'pool_timeout' is the maximum number of seconds to wait when retrieving a
        # new connection from the pool. After the specified amount of time, an
        # exception will be thrown.
        pool_timeout=30,  # 30 seconds

        # 'pool_recycle' is the maximum number of seconds a connection can persist.
        # Connections that live longer than the specified amount of time will be
        # re-established
        pool_recycle=1800,  # 30 minutes
        # [END_EXCLUDE]
    )
    return pool`

And this is my session generator

    `@asynccontextmanager
    async def get_session() -> AsyncSession:
        engine = await connect_with_connector()
        async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
        )
        async with async_session() as session:
            yield session`

When I try to execute my query like this:

async with get_session() as session:
    selected = await session.execute(selectable)

I recieve this error: "'Connection' object is not callable"

Even if when I debug I see that the session is of type AsyncSession


Solution

  • I was struggling massively with this recently but found a solution on the Github repo: https://github.com/sqlalchemy/sqlalchemy/issues/8215#issuecomment-1324956281.

    Since the sqlalchemy create_async_engine function does not support an async creator as of 03/02/23, you have to wrap the Google CloudSQL Connection object in an "asyncifying" function like so:

    import os
    from google.cloud.sql.connector import create_async_connector
    from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
    from sqlalchemy.util import await_only
    
    async def get_async_engine(
        db_host: str,
        db_name: str,
        db_pass: str,
        db_port: int,
        db_user: str
    ) -> AsyncEngine:
        connector = await create_async_connector()
    
        def asyncify_connection():
            from sqlalchemy.dialects.postgresql.asyncpg import (
                AsyncAdapt_asyncpg_connection,
            )
            
            connection = connector.connect_async(
                db_host,
                "asyncpg",
                db=db_name,
                password=db_pass,
                port=db_port,
                user=db_user,
            )        
    
            return AsyncAdapt_asyncpg_connection(
                engine.dialect.dbapi,
                await_only(connection),
                prepared_statement_cache_size=100,
            )
    
        return create_async_engine(
            "postgresql+asyncpg://",
            echo=True,
            creator=asyncify_connection,
        )
    

    This solution is rather hacky as is, but hopefully sqlalchemy can be extended soon to allow for an async creator.

    Edit-08/03/23: Changed the code snippet as the previous code was prone to bugs due to awaiting an already awaited coroutine when the function is used concurrently.