Search code examples
pythonpostgresqlsqlalchemyasyncpg

SQLAlchemy async event listen engine.sync_engine SET string parameter replacement


I am trying to listen to the engine_connect event, grab the cursor on that connection, then issue a SET query, however I can't seem to get the parameter replacement to work. I ONLY have this issue with async engine which uses from sqlalchemy.dialects.postgresql.asyncpg import AsyncAdapt_asyncpg_cursor as its cursor. Notice that I am able to use parameter substitution in a SELECT query, but not in a SET query.

Here's the error:

sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError: <class 'asyncpg.exceptions.PostgresSyntaxError'>: syntax error at or near "$1"

and heres a code snippet to reproduce

import asyncio

from sqlalchemy import event
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

db_url = make_url('postgresql://xxx:yyy@127.0.0.1/my_db')

async_db_url = db_url.set(drivername="postgresql+asyncpg")
engine = create_async_engine(
    async_db_url,
    pool_timeout=5,
    pool_pre_ping=True,
    pool_recycle=600,
    future=True,
)

async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False, future=True,)


def foobar_engine_connect_listener(connection, branch):
    identity = "arn:aws:sts::123456:assumed-role/thing_1"
    cursor = connection.connection.cursor()
    
    # THIS WORKS 
    cursor.execute(
        "select %s", (identity,)
    )
    
    # THIS THROWS THE ERROR
    cursor.execute(
        "set session iam.identity = %s", (identity,)
    )


event.listens_for(engine.sync_engine, "engine_connect")(foobar_engine_connect_listener)


async def go():
    async with async_session() as session:
        await session.execute('select 1')


if __name__ == "__main__":
    asyncio.run(go())


Solution

  • The solution ended up being set_config with %s param format:

    cursor.execute("select set_config('iam.identity', %s, false)", (identity,))