Search code examples
python-asyncioaiohttp

How to pause coroutines in Python?


I have class like a Singleton for aiohttp Client Session. My code need to make responses with 1 session. Also i need to recreate session if they lifetime > 15 minutes. Sometimes, request A obtains a session and begins using it when its lifetime is 899 seconds, but request B also wants to obtain the session, causing it to be recreated. As a result, the first request fails with an exception and does not execute. How can i do it?

class Session:
    def __init__(self):
        self.session = None
        loop = asyncio.get_event_loop()
        loop.create_task(self.create_session())
        self.last_created = datetime.datetime.now().timestamp()
        logger.info(f"Created session on {datetime.datetime.now()}")

    async def create_session(self):
        lock = asyncio.Lock()
        await lock.acquire()
        try:
            if self.session is not None:
                await self.session.close()
                logger.debug("Closing old session")
            self.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=50), trust_env=True)
            payload = {
                "login": LOGIN,
                "password": hashlib.md5(PASSWORD.encode()).hexdigest()
            }
            async with self.session.post(f"{URL}/login", json=payload) as response:
                data = await response.json()
                if data['code'] == "0":
                    self.last_created = datetime.datetime.now().timestamp()
                    logger.debug(f"Logged in")
                    return True
        finally:
            lock.release()

    async def get_session(self):
        lock = asyncio.Lock()
        await lock.acquire()
        if datetime.datetime.now().timestamp() - self.last_created > 900 and not self.session._connector._conns:
            logger.debug(f"{datetime.datetime.now().timestamp() - self.last_created}, creating new session")
            await self.create_session()
            return self.session
        logger.debug(f"{datetime.datetime.now().timestamp() - self.last_created}, returning old session")
        lock.release()
        return self.session

I tried to use Lock, but it didn't help or i use it not correctly.

Also, i tried to check session._connector._conns, but sometimes i requests fails with an exception.


Solution

  • Ok - re-reading your code and question I could figure out what you intend to do.

    As stated in the comment, the major issue, if you want to follow this design, is that you are using Lock incorrectly: the same lock object have to be shared by everyone which will use a resource that should not run concurrently. As it is, you create a transient lock object inside your methods, and those will always succeed in been acquired.

    You can rearrange your code more less like this:

    import asyncio 
    ...
    logger = ...
    ...
    
    TIMEOUT = 900  # prefer to use "magic numbers" as constants in the beggining of files
                   # so that the value is (1) easy to find; (2) modifed at once everywhere it is use
                   # and (3) get a descriptive name
    
    class Session:
        def __init__(self):
            self.session = None
            self.lock = asyncio.Lock()
            loop = asyncio.get_event_loop()
            loop.create_task(self.create_session())
            self.last_created = datetime.datetime.now().timestamp()
            logger.info(f"Created session on {datetime.datetime.now()}")
    
        async def create_session(self):
    
            async with self.lock:
                if self.session is not None:
                    await self.session.close()
                    logger.debug("Closing old session")
                self.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=50), trust_env=True)
                payload = {
                    "login": LOGIN,
                    "password": hashlib.md5(PASSWORD.encode()).hexdigest()
                }
                async with self.session.post(f"{URL}/login", json=payload) as response:
                    data = await response.json()
                    if data['code'] == "0":
                        self.last_created = datetime.datetime.now().timestamp()
                        logger.debug(f"Logged in")
                        return True
    
    
        async def get_session(self):
            # no lock use needed here: if there is a concurrent request, it will
            # be paused in "create_session"
            if (session_age:=datetime.datetime.now().timestamp() - self.last_created) > TIMEOUT and not self.session._connector._conns:
                logger.debug(f"{session_age}, creating new session")
                await self.create_session()
                return self.session
            logger.debug(f"{session_age}, returning old session")
            return self.session
    
    

    However, while this will avoid a session to be created while other is being "get", it probably won't be of any use: whatever code got the session by calling get_session won't know about locks in this class, as it is - a second task needing the same sessions will just go through and the session in your class will be reset.

    The way to keep the lock while your session is used by its clients is to implement yourself the context manager protocol - so that your Session instance can be used with the with command: that can lock the session usage. (Not sure if it would be of any use as well: because this will limit your usage to one request at a time - maybe you can have a session-pool instead, or think of another mechanism to renew sessions when needed)

    import asyncio 
    ...
    logger = ...
    ...
    
    TIMEOUT = 900  # prefer to use "magic numbers" as constants in the beggining of files
                   # so that the value is (1) easy to find; (2) modifed at once everywhere it is use
                   # and (3) get a descriptive name
    
    class Session:
        def __init__(self):
            self.session = None
            self.lock = asyncio.Lock()
            loop = asyncio.get_event_loop()
            loop.create_task(self.create_session())
            self.last_created = datetime.datetime.now().timestamp()
            logger.info(f"Created session on {datetime.datetime.now()}")
    
        async def create_session(self):
            # lock now moved to "__aenter__" method.
            if self.session is not None:
                await self.session.close()
                logger.debug("Closing old session")
            ... # same code as in your example
    
        async def __aenter__(self):
            # no lock use needed here: if there is a concurrent request, it will
            # be paused in "create_session"
            try:
                self.lock.acquire()
            exception:
                logger.Error("could not acquire session lock")
            
            if (session_age:=datetime.datetime.now().timestamp() - self.last_created) > TIMEOUT and not self.session._connector._conns:
                logger.debug(f"{session_age}, creating new session")
                await self.create_session()
                return self.session
            logger.debug(f"{session_age}, returning old session")
            return self.session
        
        async def __aexit__(self, *args):
            self.lock.release()