Search code examples
pythonpython-3.xsqlalchemybackground-process

how to make the sqlalchemy async session(Async Session) generator as class-base?


I have the fast API application and run schedule task in a background thread as a startup event in fast API. so when I use the SQlAlchemy async session in route scope like: session: AsyncSession=Depends(instance_manger.db_instance.get_db_session) it's ok and runs as correct , but when it's run in the background thread I have the below error. I use python module => SQLAlchemy[asyncio] asyncmy pymysql fastapi

database.py

class DBManager:
    def __init__(self):
        self.SQLALCHEMY_DATABASE_URL = None
        self.config_reader_instance = None
        self.engine = None
        self._session_factory = None
        self.logger_handler_instance = None
        self.db = None

    def initialize(self, config_reader_instance, logger_handler_instance):
        self.logger_handler_instance = logger_handler_instance
        self.config_reader_instance = config_reader_instance
        self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
            self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
            self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
            self.config_reader_instance.DB_INFO['db_name'])
        self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
        # self.engine.begi/n()
        self._session_factory = async_scoped_session(sessionmaker(
            self.engine, class_=AsyncSession, expire_on_commit=False), scopefunc=current_task)
        # self._session_factory = orm.scoped_session(
        #     orm.sessionmaker(
        #         class_=AsyncSession,
        #         autoflush=False,
        #         bind=self.engine,
        #     ),
        # )

    async def get_db_session(self) -> AsyncSession:
        async with self._session_factory() as session:
            try:
                yield session
            except Exception as e:
                self.logger_handler_instance.write_log(__name__, logging.FATAL,
                                                       'Session rollback because of exception')
                self.logger_handler_instance.write_log(__name__, logging.FATAL, e)
                await session.rollback()
                raise
            finally:
                await session.close()

background_thread.py

class BackgroundRunnable:
    def __init__(self):
        self.instance_manger = None
        self.core_process_instance = None
        self.conf_reader_instance = None
        self.process_id = None
        self.process_name = "BTC"

    def initialize(self, instance_manager: InstanceManager):
        self.instance_manger = instance_manager
        return self

    def set_process_info(self, process_name):
        self.process_id = os.getpid()
        self.process_name = process_name

    async def run_main(self):
        self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
                                                               "Background Thread is start")
        results = await CryptoCoinService(
            CryptoCoinRepository(AsyncSession(self.instance_manger.db_instance.engine))).get_coin()
        print(results)

crypto_coin_repository.py

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import class_mapper

from db.models.models import CryptoCoinModel


class CryptoCoinRepository:
    def __init__(self, session: AsyncSession) -> None:
        self.session = session

    async def get_all(self) -> bool:
        results = await self.session.execute(
            select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == 'BTC'))
        results_ = results.fetchone()
        if results_.__len__() == 0:
            return False
        else:
            return True

main.py

from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession

from coin_server.background_thread import BackgroundRunnable
from coin_server.core_process import CoreProcess
from core.instance_manager import InstanceManager
from db.database import DBManager
from db.repository.crypto_coin_repository import CryptoCoinRepository
from db.services.crypto_coin_service import CryptoCoinService

deposit_Router = APIRouter()

instance_manager = InstanceManager()
instance_manager.initialize()
db_instance = DBManager()
db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)


@deposit_Router.post('/')
async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
    results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
    print(results)

deposit_app = FastAPI()

@deposit_app.on_event('startup')
async def app_startup():
    background_runnable = BackgroundRunnable()
    background_runnable.initialize(instance_manager)
    asyncio.create_task(background_runnable.run_main())
    # asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())

deposit_app.include_router(deposit_Router)

when I run fast API app error like belong output.

INFO:     Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
INFO:     Started reloader process [176] using watchgod
INFO:     Started server process [179]
INFO:     Waiting for application startup.
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<BackgroundRunnable.run_main() done, defined at /mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py:48> exception=At
tributeError("'async_generator' object has no attribute 'execute'")>
Traceback (most recent call last):
  File "/mnt/c/Users/dr_r00t3r/Desktop/main/coin_server/background_thread.py", line 51, in run_main
    results = await CryptoCoinService(
  File "/mnt/c/Users/dr_r00t3r/Desktop/main/db/repository/crypto_coin_repository.py", line 17, in get_all
    results = await self.session.execute(
AttributeError: 'async_generator' object has no attribute 'execute'
INFO:     Application startup complete.


Solution

  • It's all note: when you use function get_db_session in database.py like a generator, close function of session doesn't work as auto, so you should close them like manually.

    database.py

    import logging
    
    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, scoped_session
    
    Base = declarative_base()
    
    
    class DBManager:
        def __init__(self):
            self.SQLALCHEMY_DATABASE_URL = None
            self.config_reader_instance = None
            self.engine = None
            self.session_factory = None
            self.Base = declarative_base()
            self.logger_handler_instance = None
    
        def initialize(self, config_reader_instance, logger_handler_instance):
            self.logger_handler_instance = logger_handler_instance
            self.config_reader_instance = config_reader_instance
            self.SQLALCHEMY_DATABASE_URL = "mysql+asyncmy://{0}:{1}@{2}:{3}/{4}".format(
                self.config_reader_instance.DB_INFO['db_username'], self.config_reader_instance.DB_INFO['db_password'],
                self.config_reader_instance.DB_INFO['db_hostname'], self.config_reader_instance.DB_INFO['db_port'],
                self.config_reader_instance.DB_INFO['db_name'])
            self.engine = create_async_engine(self.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True, pool_size=30,
                                              max_overflow=30, echo_pool=True, echo=False,
                                              pool_recycle=3600)  # recycle every hour
            DBManager.Base = declarative_base()
            self.session_factory = scoped_session(sessionmaker(
                self.engine, class_=AsyncSession, expire_on_commit=False
            ))
    
        def get_db_session(self):
            session = self.session_factory()
            try:
                yield session
            except Exception as e:
                self.logger_handler_instance.log(__name__, logging.FATAL,
                                                 'Session rollback because of exception')
                self.logger_handler_instance.log(__name__, logging.FATAL, e)
                session.rollback()
                raise
            finally:
                session.close()
    
        async def init_models(self):
            async with self.engine.begin() as conn:
                await conn.run_sync(Base.metadata.drop_all)
                await conn.run_sync(Base.metadata.create_all)
    

    background_thread.py

    class BackgroundRunnable:
        def __init__(self):
            self.instance_manger = None
            self.core_process_instance = None
            self.conf_reader_instance = None
            self.process_id = None
            self.process_name = "BTC"
    
        def initialize(self, instance_manager: InstanceManager):
            self.instance_manger = instance_manager
            return self
    
        def set_process_info(self, process_name):
            self.process_id = os.getpid()
            self.process_name = process_name
    
        async def run_main(self):
            self.instance_manger.logger_handler_instance.write_log(__name__, logging.INFO,
                                                                   "Background Thread is start")
            self.session: AsyncSession = next(self.instance_manger.db_instance.get_db_session())
            results = await CryptoCoinService(CryptoCoinRepository(self.session)).get_coin(
                self.instance_manger.config_reader_instance.BTC_INFO['BTC_COIN'])
            print(results)
    

    crypto_coin_repository.py

    """Repositories module."""
    from contextlib import AbstractContextManager
    from typing import Callable
    
    from sqlalchemy import select
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.orm import class_mapper, Session
    
    from db.models.models import CryptoCoinModel
    
    
    class CryptoCoinRepository:
        def __init__(self, session: AsyncSession) -> None:
            self.session = session
    
        async def get_all(self, coin) -> bool:
            results = await self.session.execute(
                select(CryptoCoinModel._id).where(CryptoCoinModel._symbol == coin))
            results = results.fetchall()
            if len(results) == 0:
                return False
            else:
                return True
    
        def serialize(self, model):
            """Transforms a model into a dictionary which can be dumped to JSON."""
            # first we get the names of all the columns on your model
            columns = [c.key for c in class_mapper(model.__class__).columns]
            # then we return their values in a dict
            return dict((c, getattr(model, '_' + c)) for c in columns)
    
    
    class NotFoundError(Exception):
        symbol: str
    
        def __init__(self):
            super().__init__(f'{self._symobl} not found,please add this coin to db')
    
    
    class CryptoCoinNotFoundError(NotFoundError):
        # entity_name: str = 'User'
        pass
    

    main.py

    from fastapi import APIRouter, Depends, Request, Response, FastAPI, status
    from fastapi.responses import JSONResponse
    from sqlalchemy.ext.asyncio import AsyncSession
    
    from coin_server.background_thread import BackgroundRunnable
    from coin_server.core_process import CoreProcess
    from core.instance_manager import InstanceManager
    from db.database import DBManager
    from db.repository.crypto_coin_repository import CryptoCoinRepository
    from db.services.crypto_coin_service import CryptoCoinService
    
    deposit_Router = APIRouter()
    
    instance_manager = InstanceManager()
    instance_manager.initialize()
    db_instance = DBManager()
    db_instance.initialize(instance_manager.config_reader_instance, instance_manager.logger_handler_instance)
    
    
    @deposit_Router.post('/')
    async def index(request: Request, session: AsyncSession = Depends(db_instance.get_db_session)):
        results = await CryptoCoinService(CryptoCoinRepository(session)).get_coin()
        print(results)
    
    deposit_app = FastAPI()
    
    @deposit_app.on_event('startup')
    async def app_startup():
        background_runnable = BackgroundRunnable()
        background_runnable.initialize(instance_manager)
        asyncio.create_task(background_runnable.run_main())
        # asyncio.create_task(BackgroundRunnable().initialize(instance_manager).run_main())
    
    deposit_app.include_router(deposit_Router)