I'm new to asyncio
. Most asyncio
code examples show parallel processing with a fixed number of tasks:
tasks = [asyncio.ensure_future(download_one(url)) for url in urls]
await asyncio.gather(*tasks)
I need to download a large number of URLs. I'm currently using aiohttp + asyncio
in the aforementioned way to run download the files in batches of let's say 16 concurrent tasks.
The problem with this approach is that the asyncio
operation blocks (no puns intended) until the entire batch has been completed.
How can I dynamically add a new task to the queue as soon as a task is finished?
Here's my initial solution (using consumer-producer design pattern):
import asyncio, random
urls = ['url1',....]
def get_url() -> str | None:
global urls
return urls.pop() if any(urls) else None
async def producer(queue: asyncio.Queue):
while True:
if queue.full():
print(f"queue full ({queue.qsize()}), sleeping...")
await asyncio.sleep(0.3)
# get a url to fetch
url = get_url()
if not url:
print(f"PRODUCED: {url}")
await queue.put(url)
await asyncio.sleep(0.1)
async def consumer(queue: asyncio.Queue):
while True:
url = await queue.get()
# simulate I/O operation
await asyncio.sleep(random.randint(1, 3))
print(f"CONSUMED: {url}")
async def main():
concurrency = 3
queue: asyncio.Queue = asyncio.Queue(concurrency)
# fire up the both producers and consumers
consumers = [asyncio.create_task(consumer(queue)) for _ in range(concurrency)]
producers = [asyncio.create_task(producer(queue)) for _ in range(1)]
# with both producers and consumers running, wait for
# the producers to finish
await asyncio.gather(*producers)
print("---- done producing")
# wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
In my case, since the list of urls to process is pretty huge (in double digit millions), the producer waits for workers to become available before pushing another task in the queue.