I am creating an async fastAPI microservice using graphql(strawberry). My database is hosted on GoogleCloudSQL and it is a postgres database. My microservice is working really good on local with local db but now when I built my connector to the GoogleCloudSQL it doesn't work that well anymore. My question is, how can I create a session pool and yeild session to each of my requests ?
Here are some code snippets:
`
# [START cloud_sql_postgres_sqlalchemy_connect_connector]
import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from google.cloud.sql.connector import IPTypes, create_async_connector
import asyncpg
async def connect_with_connector() -> AsyncEngine:
instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"] # e.g. 'project:region:instance'
db_user = os.environ["DB_USER"] # e.g. 'my-db-user'
db_pass = os.environ["DB_PASS"] # e.g. 'my-db-password'
db_name = os.environ["DB_NAME"] # e.g. 'my-database'
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
# initialize Cloud SQL Python Connector object
connector = await create_async_connector()
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect_async(
instance_connection_name,
"asyncpg",
user=db_user,
password=db_pass,
db=db_name,
ip_type=IPTypes.PUBLIC
)
return conn
# The Cloud SQL Python Connector can be used with SQLAlchemy
# using the 'creator' argument to 'create_engine'
connection = await getconn()
pool = create_async_engine(
"postgresql+asyncpg://",
creator=connection,
# [START_EXCLUDE]
# Pool size is the maximum number of permanent connections to keep.
pool_size=5,
# Temporarily exceeds the set pool_size if no connections are available.
max_overflow=2,
# The total number of concurrent connections for your application will be
# a total of pool_size and max_overflow.
# 'pool_timeout' is the maximum number of seconds to wait when retrieving a
# new connection from the pool. After the specified amount of time, an
# exception will be thrown.
pool_timeout=30, # 30 seconds
# 'pool_recycle' is the maximum number of seconds a connection can persist.
# Connections that live longer than the specified amount of time will be
# re-established
pool_recycle=1800, # 30 minutes
# [END_EXCLUDE]
)
return pool`
And this is my session generator
`@asynccontextmanager
async def get_session() -> AsyncSession:
engine = await connect_with_connector()
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
yield session`
When I try to execute my query like this:
async with get_session() as session:
selected = await session.execute(selectable)
I recieve this error: "'Connection' object is not callable"
Even if when I debug I see that the session
is of type AsyncSession
I was struggling massively with this recently but found a solution on the Github repo: https://github.com/sqlalchemy/sqlalchemy/issues/8215#issuecomment-1324956281.
Since the sqlalchemy create_async_engine
function does not support an async creator
as of 03/02/23, you have to wrap the Google CloudSQL Connection
object in an "asyncifying" function like so:
import os
from google.cloud.sql.connector import create_async_connector
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy.util import await_only
async def get_async_engine(
db_host: str,
db_name: str,
db_pass: str,
db_port: int,
db_user: str
) -> AsyncEngine:
connector = await create_async_connector()
def asyncify_connection():
from sqlalchemy.dialects.postgresql.asyncpg import (
AsyncAdapt_asyncpg_connection,
)
connection = connector.connect_async(
db_host,
"asyncpg",
db=db_name,
password=db_pass,
port=db_port,
user=db_user,
)
return AsyncAdapt_asyncpg_connection(
engine.dialect.dbapi,
await_only(connection),
prepared_statement_cache_size=100,
)
return create_async_engine(
"postgresql+asyncpg://",
echo=True,
creator=asyncify_connection,
)
This solution is rather hacky as is, but hopefully sqlalchemy can be extended soon to allow for an async creator
.
Edit-08/03/23: Changed the code snippet as the previous code was prone to bugs due to awaiting an already awaited coroutine when the function is used concurrently.