Search code examples
pythonpython-asynciohttpx

How to get httpx.gather() with return_exceptions=True to complete the Queue of tasks when the exception count exceeds the worker count?


I'm using asyncio in concert with the httpx.AsyncClient for the first time and trying to figure out how to complete my list of tasks when some number of them may fail. I'm using a pattern I found in a few places where I populate an asyncio Queue with coroutine functions, and have a set of workers process that queue from inside asyncio.gather. Normally, if the function doing the work raises an exception, you'll see the whole script just fail during that processing, and report the exception along with a RuntimeWarning: coroutine foo was never awaited, indicating that you never finished your list.

I found the return_exceptions option for asyncio.gather, and that has helped, but not completely. my script will still die after I've gotten the exception the same number of times as the total number of workers that I've thrown into my call to gather. The following is a simple script that demonstrates the problem.

from httpx import AsyncClient, Timeout
from asyncio import run, gather, Queue as asyncio_Queue
from random import choice


async def process_url(client, url):
    """
    opens the URL and pulls a header attribute
    randomly raises an exception to demonstrate my problem
    """
    if choice([True, False]):
        await client.get(url)
        print(f'retrieved url {url}')
    else:
        raise AssertionError(f'generated error for url {url}')


async def main(worker_count, urls):
    """
    orchestrates the workers that call process_url
    """
    httpx_timeout = Timeout(10.0, read=20.0)
    async with AsyncClient(timeout=httpx_timeout, follow_redirects=True) as client:
        tasks = asyncio_Queue(maxsize=0)
        for url in urls:
            await tasks.put(process_url(client, url))

        async def worker():
            while not tasks.empty():
                await tasks.get_nowait()

        results = await gather(*[worker() for _ in range(worker_count)], return_exceptions=True)
        return results

if __name__ == '__main__':
    urls = ['https://stackoverflow.com/questions',
            'https://stackoverflow.com/jobs',
            'https://stackoverflow.com/tags',
            'https://stackoverflow.com/users',
            'https://www.google.com/',
            'https://www.bing.com/',
            'https://www.yahoo.com/',
            'https://www.foxnews.com/',
            'https://www.cnn.com/',
            'https://www.npr.org/',
            'https://www.opera.com/',
            'https://www.mozilla.org/en-US/firefox/',
            'https://www.google.com/chrome/',
            'https://www.epicbrowser.com/'
            ]
    print(f'processing {len(urls)} urls')
    run_results = run(main(4, urls))
    print('\n'.join([str(rr) for rr in run_results]))

one run of this script outputs:

processing 14 urls
retrieved url https://stackoverflow.com/tags
retrieved url https://stackoverflow.com/jobs
retrieved url https://stackoverflow.com/users
retrieved url https://www.bing.com/
generated error for url https://stackoverflow.com/questions
generated error for url https://www.foxnews.com/
generated error for url https://www.google.com/
generated error for url https://www.yahoo.com/
sys:1: RuntimeWarning: coroutine 'process_url' was never awaited

Process finished with exit code 0

Here you see that we got through 8 of the total 14 urls, but by the time we reached 4 errors, the script wrapped up and ignored the rest of the urls.

What I want to do is have the script complete the full set of urls, but inform me of the errors at the end. Is there a way to do this here? It may be that I'll have to wrap everything in process_url() inside a try/except block and use something like aiofile to dump them out in the end?

Update To be clear, this demo script is a simplification of what I'm really doing. My real script is hitting a small number of server api endpoints a few hundred thousand times. The purpose of using the set of workers is to avoid overwhelming the server I'm hitting [it's a test server, not production, so it's not intended to handle huge volumes of requests, though the number is greater than 4 8-)]. I'm open to learning about alternatives.


Solution

  • The program design you have outlined should work OK, but you must prevent the tasks (instances of your worker function) from crashing. The below listing shows one way to do that.

    Your Queue is named "tasks" but the items you place in it aren't tasks - they are coroutines. As it stands, your program has five tasks: one of them is the main function, which is made into a task by asyncio.run(). The other four tasks are instances of worker, which are made into tasks by asyncio.gather.

    When worker awaits on a coroutine and that coroutine crashes, the exception is propagated into worker at the await statement. Because the exception isn't handled, worker will crash in turn. To prevent that, do something like this:

    async def worker():
        while not tasks.empty():
            try:
                await tasks.get_nowait()
            except Exception:
                pass
                # You might want to do something more intelligent here
                # (logging, perhaps), rather than simply suppressing the exception
    

    This should allow your example program to run to completion.