Search code examples
pythonasynchronoussqlalchemyceleryfastapi

How to use AsyncSession from sqlalchemy in celery tasks?


Use AsyncSession in celery tasks

I use fastapi and sqlalchemy, I must create celery task, that will go to the database and check does any objects of my Event (table) has end_time < datetime.now()

There is my code:

@asynccontextmanager
async def scoped_session():
    scoped_factory = async_scoped_session(
        async_session,
        scopefunc=asyncio.current_task()
    )
    try:
        async with scoped_factory() as s:
            yield s
    finally:
        await scoped_factory().remove()


async def logic():
    async with scoped_session() as session:
        stmt = select(event.models.Event).where(
            event.models.Event.end_time <= datetime.now()
        )
        results = await session.execute(stmt)
        for res in results.fetchall():
            print(res.is_event_done)


@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self):
    asyncio.run(logic())

here is my async_session

engine = create_async_engine(settings.db_url, echo=True)

async_session = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

so I got \'_asyncio.Task\' object is not callable


Solution

  • I just do like this

    async def update_event() -> None:
        async with async_session() as session:
            stmt = update(event.models.Event).where(
                event.models.Event.end_time <= datetime.now(),
                event.models.Event.is_active is True
            ).values(is_done=True)
            await session.execute(stmt)
    
    
    @celery.task(name='is_event_done', bind=True, ignore_result=True)
    def is_event_done(self) -> None:
        loop.run_until_complete(update_event())
    

    and now its work fine, I hope that its a good solution If there is anything wrong let me know, thanks!