Search code examples
python-3.xsqlalchemypython-asyncio

SqlAlchemy asyncio orm: How to query the database


In SqlAlchemy async orm engine how do I query a table and get a value or all?

I know from the non async methods that I can just do

SESSION.query(TableClass).get(x)

but trying this with the async methods it throws the next error:

AttributeError: 'AsyncSession' object has no attribute 'query'.

Here's my SESSION variable defined. The LOOP variable is just asyncio.get_event_loop() used to start async methods when my sql module is loaded and populate variables used as a cache to avoid caching the database every time I need somethin:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

from .. import CONFIG, LOOP

def _build_async_db_uri(uri):
    if "+asyncpg" not in uri:
        return '+asyncpg:'.join(uri.split(":", 1))
    return uri

async def start() -> declarative_base:
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async with engine.begin() as conn:
        BASE.metadata.bind = engine
        await conn.run_sync(BASE.metadata.create_all)
    return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))


BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())

Here's an example of table and cache function:

class TableClass:
    __tablename__ = "tableclass"
    id = Column(Integer, primary_key = True)
    alias = Column(Integer)

CACHE = {}
async def _load_all():
    global CACHE
    try:
        curr = await SESSION.query(TableClass).all()
        CACHE = {i.id: i.alias for i in curr}

LOOP.run_until_complete(_load_all())

Solution

  • session.query is the old API. The asynchronous version uses select and accompanying methods.

    from sqlalchemy.future import select
    from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
    from sqlalchemy.orm import sessionmaker
    
    
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async_session = sessionmaker(
        engine, expire_on_commit=False, class_=AsyncSession
    )
    
    
    CACHE = {}
    async def _load_all():
        global CACHE
        try:
            async with async_session() as session:
                q = select(TableClass)
                result = await session.execute(q)
                curr = result.scalars()
                CACHE = {i.id: i.alias for i in curr}
        except:
            pass
    
    LOOP.run_until_complete(_load_all())
    

    You can read more about SqlAlchemy Asynchronous I/O here