Search code examples
pythonasynchronousqueuepython-asyncioaiohttp

asyncio.Queue - put urls from txt file


I need to pass URLs from txt file to the script that fetch URLs and do some work with the help of aiohttp and asyncio. So I use asyncio.Queue to put URLs from txt file into queue. The nuance is that the txt file may be very large and may not fit in memory, and I tried to use queue maxsize. But when the limit is less than number of URLs - script is stuck. So I have 2 questions:

  1. where is my mistake?
  2. maybe there is better solution than using asyncio.Queue for this case?

script.py:

import asyncio
from aiohttp import ClientSession


async def fetch(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()


async def main():
    with open("urls.txt", 'r') as f:
        queue = asyncio.Queue(maxsize=5)
        for line in f:
            line = line.strip()
            await queue.put(line)
        # await queue.join() - tried this

    while not queue.empty():
        current_url = await queue.get()
        try:
            content = await fetch(current_url)
            print(f"{current_url}: {content[:15]}")
        except Exception as e:
            print(f"Error fetching {current_url}: {e}")
        finally:
            queue.task_done()

asyncio.run(main())

urls.txt:

https://www.google.com/
https://www.youtube.com/
https://www.facebook.com/
https://www.wikipedia.org/
https://www.amazon.com/
https://www.instagram.com/
https://www.twitter.com/
https://www.tumblr.com/
https://www.pinterest.com/
https://www.reddit.com/

In this script I use maxsize=5 and number of URLs in txt file is 10. I have tried to add await queue.join() after for line in f: loop but it didn't help. Script works only without await queue.join() and when maxsize >= number of URLs or is not specified.


Solution

  • Your program structure is wrong. To use a Queue you need a consumer (one or more) that reads from the queue in parallel.

    Here is an simple example how you can create 3 workers that donwload the content as you put the URLs into the Queue. At the end we end the workers by putting None to the queue:

    import asyncio
    
    from aiohttp import ClientSession
    
    
    async def fetch(url):
        async with ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()
    
    
    async def consumer(queue):
        while True:
            current_url = await queue.get()
            try:
                if current_url is None:
                    break
    
                content = await fetch(current_url)
                print(f"{current_url}: {content[:15].strip()}")
            except Exception as e:
                print(f"Error fetching {current_url}: {e}")
            finally:
                queue.task_done()
    
    
    async def main():
        queue = asyncio.Queue(maxsize=5)
    
        # for example, we create 3 workers consuming the queue
        workers = {asyncio.Task(consumer(queue)) for _ in range(3)}
    
        with open("urls.txt", "r") as f:
            for line in f:
                line = line.strip()
                await queue.put(line)
    
        # end the workers
        for _ in range(len(workers)):
            await queue.put(None)
    
        await queue.join()
    
    
    asyncio.run(main())
    

    Prints:

    https://www.google.com/: <!doctype html>
    https://www.facebook.com/: <!DOCTYPE html>
    https://www.wikipedia.org/: <!DOCTYPE html>
    https://www.amazon.com/: <!doctype html>
    https://www.instagram.com/: <!DOCTYPE html>
    https://www.tumblr.com/: <!doctype html
    https://www.youtube.com/: <!DOCTYPE html>
    https://www.twitter.com/: <!DOCTYPE html>
    https://www.pinterest.com/: <!DOCTYPE html>
    https://www.reddit.com/: <!DOCTYPE