I am trying to listen to the engine_connect
event, grab the cursor on that connection, then issue a SET
query, however I can't seem to get the parameter replacement to work. I ONLY have this issue with async engine which uses from sqlalchemy.dialects.postgresql.asyncpg import AsyncAdapt_asyncpg_cursor
as its cursor.
Notice that I am able to use parameter substitution in a SELECT query, but not in a SET query.
Here's the error:
sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError: <class 'asyncpg.exceptions.PostgresSyntaxError'>: syntax error at or near "$1"
and heres a code snippet to reproduce
import asyncio
from sqlalchemy import event
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
db_url = make_url('postgresql://xxx:yyy@127.0.0.1/my_db')
async_db_url = db_url.set(drivername="postgresql+asyncpg")
engine = create_async_engine(
async_db_url,
pool_timeout=5,
pool_pre_ping=True,
pool_recycle=600,
future=True,
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False, future=True,)
def foobar_engine_connect_listener(connection, branch):
identity = "arn:aws:sts::123456:assumed-role/thing_1"
cursor = connection.connection.cursor()
# THIS WORKS
cursor.execute(
"select %s", (identity,)
)
# THIS THROWS THE ERROR
cursor.execute(
"set session iam.identity = %s", (identity,)
)
event.listens_for(engine.sync_engine, "engine_connect")(foobar_engine_connect_listener)
async def go():
async with async_session() as session:
await session.execute('select 1')
if __name__ == "__main__":
asyncio.run(go())
The solution ended up being set_config with %s param format:
cursor.execute("select set_config('iam.identity', %s, false)", (identity,))