Search code examples
pythonasynchronouspython-asyncio

Perform large numbers of HTTP requests asyncronously N at a time


For some one-time task I need to go through all the records in a database of which there are a few millions, read a value in a cell, make a HTTP request and update another cell which is currently NULL.

I want to send all of them by portions, asynchronously, via asyncio. And not too many at a time because remote server may ban me: No more than 50 requests/second or at a time.

I've found this code:

import asyncio
import aiohttp


async def one(session, url):
    # request the URL and read it until complete or canceled
    async with session.get(url) as resp:
        await resp.text()


async def fire(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(loop.create_task(one(session, url)))

        # 10 seconds
        try:
            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
        except asyncio.TimeoutError:
            pass


loop = asyncio.get_event_loop()
loop.run_until_complete(fire([urls...]))

But it will send all the request at once.

How could I do it N at a time? Meaning, send N, then wait for 1 ... a few or even all of them to return values, then send another lot of N... and so on.


Solution

  • Option A: With just asyncio in batches

    Python <3.11

    from asyncio import create_task, gather, run, sleep
    
    from aiohttp import ClientSession
    
    async def get_one(session: ClientSession, url: str) -> None:
        print("Requesting", url)
        async with session.get(url) as resp:
            text = await resp.text()
            await sleep(2)  # for demo purposes
            print("Got response from", url, text.strip().split("\n", 1)[0])
    
    async def get_all(urls: list[str], num_concurrent: int) -> None:
        url_iterator = iter(urls)
        keep_going = True
        async with ClientSession() as session:
            while keep_going:
                tasks = []
                for _ in range(num_concurrent):
                    try:
                        url = next(url_iterator)
                    except StopIteration:
                        keep_going = False
                        break
                    new_task = create_task(get_one(session, url))
                    tasks.append(new_task)
                await gather(*tasks)
    
    async def main() -> None:
        urls = [
            "https://github.com",
            "https://stackoverflow.com",
            "https://python.org",
        ]
        await get_all(urls, 2)
    
    run(main())
    

    Output:

    Requesting https://github.com
    Requesting https://stackoverflow.com
    Got response from https://github.com <!DOCTYPE html>
    Got response from https://stackoverflow.com <!DOCTYPE html>
    Requesting https://python.org
    Got response from https://python.org <!doctype html>
    

    You'll notice that the third requests (to python.org) is only sent after both previous requests have returned a response. This setup will essentially perform your total number of requests in batches of num_concurrent.

    Python >=3.11

    With the newer TaskGroup class, we can make the get_all function a bit more concise:

    from asyncio import TaskGroup, run, sleep
    
    from aiohttp import ClientSession
    
    
    async def get_one(session: ClientSession, url: str) -> None:
        ...  # same as above
    
    
    async def get_all(urls: list[str], num_concurrent: int) -> None:
        url_iterator = iter(urls)
        keep_going = True
        async with ClientSession() as session:
            while keep_going:
                with TaskGroup() as tg:
                    for _ in range(num_concurrent):
                        try:
                            url = next(url_iterator)
                        except StopIteration:
                            keep_going = False
                            break
                        tg.create_task(get_one(session, url))
    
    ...
    

    Option B: With just asyncio in a Queue

    The asyncio.Queue allows us to set a maximum size for it. That makes it possible to limit the maximum number of concurrently executing tasks, but we will need to use the consumer-producer-pattern:

    from asyncio import Queue, create_task, gather, run, sleep
    
    from aiohttp import ClientSession
    
    async def get_one(session: ClientSession, url: str) -> None:
        ...  # same as above
    
    STOP_SENTINEL = object()
    
    async def consumer(session: ClientSession, q: Queue[str]) -> None:
        url = await q.get()
        while url is not STOP_SENTINEL:
            await get_one(session, url)
            q.task_done()
            url = await q.get()
        q.task_done()
    
    async def main() -> None:
        urls = [
            "https://github.com",
            "https://stackoverflow.com",
            "https://python.org",
        ]
        num_concurrent = 2
        q = Queue(maxsize=num_concurrent)
        async with ClientSession() as session:
            consumers = [
                create_task(consumer(session, q))
                for _ in range(num_concurrent)
            ]
            for url in urls:
                await q.put(url)
            for _ in range(num_concurrent):
                await q.put(STOP_SENTINEL)
            await gather(*consumers)
    
    run(main())
    

    Output:

    Requesting https://github.com
    Requesting https://stackoverflow.com
    Got response from https://github.com <!DOCTYPE html>
    Requesting https://python.org
    Got response from https://stackoverflow.com <!DOCTYPE html>
    Got response from https://python.org <!doctype html>
    

    As you can see now, that third request can be sent as soon as either of the previous two returns with a response.

    That may be more efficient, even though the setup is a bit more cumbersome.


    Option C: With an extra package

    I used to run into similar issues with setting a fixed number of asyncio tasks to work on a large number of actual tasks. To make this easier I wrote the asyncio-taskpool package. With it I can do something like this:

    from asyncio import run, sleep
    
    from aiohttp import ClientSession
    from asyncio_taskpool import TaskPool
    
    async def get_one(session: ClientSession, url: str) -> None:
        ...  # same as above
    
    async def get_all(urls: list[str], num_concurrent: int) -> None:
        pool = TaskPool()
        async with ClientSession() as session:
            pool.starmap(
                get_one,
                ((session, url) for url in urls),
                num_concurrent=num_concurrent,
            )
            await pool.gather_and_close()
    
    async def main() -> None:
        urls = [
            "https://github.com",
            "https://stackoverflow.com",
            "https://python.org",
        ]
        await get_all(urls, 2)
    
    run(main())
    

    Output: (same as with the Queue approach)

    Requesting https://github.com
    Requesting https://stackoverflow.com
    Got response from https://github.com <!DOCTYPE html>
    Requesting https://python.org
    Got response from https://stackoverflow.com <!DOCTYPE html>
    Got response from https://python.org <!doctype html>
    

    You'll notice again that the third request will only be made after at least one of the other two returns with a response.

    You can try that out with larger numbers of tasks. The number being executed concurrently at any given time will never exceed num_concurrent as passed to map (starmap is just a variant of map).

    I tried to emulate the standard multiprocessing.Pool interface to an extent and find this more convenient to use, especially with long-running tasks.