Use AsyncSession in celery tasks
I use fastapi and sqlalchemy, I must create celery task, that will go to the database and check does any objects of my Event (table) has end_time < datetime.now()
There is my code:
@asynccontextmanager
async def scoped_session():
scoped_factory = async_scoped_session(
async_session,
scopefunc=asyncio.current_task()
)
try:
async with scoped_factory() as s:
yield s
finally:
await scoped_factory().remove()
async def logic():
async with scoped_session() as session:
stmt = select(event.models.Event).where(
event.models.Event.end_time <= datetime.now()
)
results = await session.execute(stmt)
for res in results.fetchall():
print(res.is_event_done)
@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self):
asyncio.run(logic())
here is my async_session
engine = create_async_engine(settings.db_url, echo=True)
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
so I got \'_asyncio.Task\' object is not callable
I just do like this
async def update_event() -> None:
async with async_session() as session:
stmt = update(event.models.Event).where(
event.models.Event.end_time <= datetime.now(),
event.models.Event.is_active is True
).values(is_done=True)
await session.execute(stmt)
@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self) -> None:
loop.run_until_complete(update_event())
and now its work fine, I hope that its a good solution If there is anything wrong let me know, thanks!