Search code examples
pythonasynchronouspython-asynciothreadpoolaiohttp

python asyncio parallel processing with a dynamic tasks queue


I'm new to asyncio. Most asyncio code examples show parallel processing with a fixed number of tasks:

tasks = [asyncio.ensure_future(download_one(url)) for url in urls]
await asyncio.gather(*tasks)

I need to download a large number of URLs. I'm currently using aiohttp + asyncio in the aforementioned way to run download the files in batches of let's say 16 concurrent tasks.

The problem with this approach is that the asyncio operation blocks (no puns intended) until the entire batch has been completed.

How can I dynamically add a new task to the queue as soon as a task is finished?


Solution

  • Here's my initial solution (using consumer-producer design pattern):

    import asyncio, random
    
    urls = ['url1',....]
    
    def get_url() -> str | None:
        global urls
        return urls.pop() if any(urls) else None
    
    
    async def producer(queue: asyncio.Queue):
        while True:
            if queue.full():
                print(f"queue full ({queue.qsize()}), sleeping...")
                await asyncio.sleep(0.3)
                continue
    
            # get a url to fetch
            url = get_url()
            if not url:
                break
            print(f"PRODUCED: {url}")
            await queue.put(url)
            await asyncio.sleep(0.1)
    
    
    async def consumer(queue: asyncio.Queue):
        while True:
            url = await queue.get()
            # simulate I/O operation
            await asyncio.sleep(random.randint(1, 3))
            queue.task_done()
            print(f"CONSUMED: {url}")
    
    
    async def main():
        concurrency = 3
        queue: asyncio.Queue = asyncio.Queue(concurrency)
    
        # fire up the both producers and consumers
        consumers = [asyncio.create_task(consumer(queue)) for _ in range(concurrency)]
        producers = [asyncio.create_task(producer(queue)) for _ in range(1)]
    
        # with both producers and consumers running, wait for
        # the producers to finish
        await asyncio.gather(*producers)
        print("---- done producing")
    
        # wait for the remaining tasks to be processed
        await queue.join()
    
        # cancel the consumers, which are now idle
        for c in consumers:
            c.cancel()
    
    
    asyncio.run(main())
    

    In my case, since the list of urls to process is pretty huge (in double digit millions), the producer waits for workers to become available before pushing another task in the queue.