Search code examples
pythonpython-asynciocoroutineaiohttp

How to run multiple periodic coroutines without blocking?


I'm writing a python script that checks the uptime of a few domains every few minutes. I run a coroutine for each website in a while loop and sleep them after the check is done. This works, however I also want to be able to cancel them. The problem I'm having is that when I await these coroutines in asyncio.gather(), they block the thread, since they never return a result.

If I remove await asyncio.gather(*tasks.values(), return_exceptions=True)I get RuntimeError: Session is closed

How can I run them without blocking the thread?

This is a simplified version of the code. I'm using a simple aiohttp server for testing.

Server code:

from aiohttp import web
import asyncio
import random


async def handle(request: web.Request) -> web.Response:
    await asyncio.sleep(random.randint(0, 3))
    return web.Response(text=f"Hello, from {request.rel_url.path}")


app = web.Application()
app.router.add_route('GET', '/{name}', handle)

web.run_app(app)

Uptime checker code:

import asyncio
import aiohttp


LIMIT = 2

async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
    while True:
        try:
            async with semaphore:
                async with session.get(url) as response:
                    if response.status != 200:
                        print(f"error with {url} {response.status}")
                    else:
                        print(f"success with {url}")
            await asyncio.sleep(5)
        except Exception as e:
            print(f"error with {url} {e}")


async def main() -> None:
    urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
    tasks = {}
    semaphore = asyncio.BoundedSemaphore(LIMIT)
    try:
        async with aiohttp.ClientSession() as session:
            for url in urls:
                tasks[url] = asyncio.create_task(
                    check_uptime_coro(session, url, semaphore))

            await asyncio.gather(*tasks.values(), return_exceptions=True)

        print("This doesn't print!")
    except Exception as e:
        print(f"error! {e}")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("This also doesn't print!")


Solution

  • You have to understnad that your call do asyncio.gather is not "blocking the thread" - it is, instead, blocking the task. If you want to run things while the checker-tasks are running, and the web session is open, simply fire the gather to synchronize those tasks in a different task than the one you want to run things on.

    If the tasks will never return, you actually don't need even to call gather on then - just take note of all tasks in a set or other container, and keep that around so they are not de-referenced, and can be properly cancelled when the time comes.

    Other than that, the sole reason you are getting the error you report when not awaiting for the tasks with gather is because your execution falls off the with statement block that opens the session.

    You could just not use a with block to start with, and call the __enter__ and __exit__ methods manually - but you can also simply rewrite things so that that with block is in a searate task, as I mentioned above. In Python 3.11 you can use task groups: they will work better than gather and cancel all checker-tasks when the parent task itself is cancelled.

    import asyncio
    import aiohttp
    
    
    LIMIT = 2
    
    async def check_uptime_coro(session: aiohttp.ClientSession, url: str, semaphore: asyncio.BoundedSemaphore) -> None:
        while True:
            try:
                async with semaphore:
                    async with session.get(url) as response:
                        if response.status != 200:
                            print(f"error with {url} {response.status}")
                        else:
                            print(f"success with {url}")
                await asyncio.sleep(5)
            except Exception as e:
                print(f"error with {url} {e}")
    
    async def check_uptime_master():
            urls = [f"http://localhost:8080/{x}" for x in range(0, 10)]
            tasks = {}
            semaphore = asyncio.BoundedSemaphore(LIMIT)
            async with aiohttp.ClientSession() as session:
                for url in urls:
                    tasks[url] = asyncio.create_task(
                        check_uptime_coro(session, url, semaphore))
    
                await asyncio.gather(*tasks.values(), return_exceptions=True)
    
    async def main() -> None:
        try:
            checker_task = asyncio.create_task(check_uptime_master())
            await asyncio.sleep(0)  # give the asyncio loop  a chance to fire-up the subtasks
            print("This now, does print!")
        except Exception as e:
            print(f"error! {e}")
        # go on with your code on the main task, DOn't forget to yield to the loop
        # so subtasks can run!
        ...
    
    
    if __name__ == "__main__":
        asyncio.run(main()) # this is the new recomended way to fire asyncio
        # loop = asyncio.get_event_loop()  #<- obsolete
        # loop.run_until_complete(main())  # <- obsolete
        print("This also doesn't print - and will not until yo write code that explictly cancels the `checker_task` above")