Run code after asyncio run_until_complete() statement has finished

I am fairly new to asyncio and I managed to do some requests with it. I made a function fetch_all() that takes in a list of the queries (URLs) and the loop previously created with asyncio as arguments, and calls the function fetch() that gets the result of each query in JSON format:

import aiohttp
import asyncio
import ssl
import nest_asyncio

async def fetch(session, url):
    async with session.get(url, ssl=ssl.SSLContext()) as response:
        return await response.json()

async def fetch_all(urls, loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(fetch_all(queries, loop))

This works properly, and I get the results of the queries in results as a list of JSONs (dictionaries). But here goes my problem: sometimes, instead of the JSON, I get an error for some results (RuntimeError, aiohttp.client_exceptions.ClientConnectorError, etc.). I guess these are one-time errors, since if I redo the query individually I get the desired result. Hence, I came up with a while loop to check which results are not dictionaries and redo their queries: I initialize repeat_queries, error_index and results with the queries and their indices, and apply run_until_complete(). Then I save each result that is a dictionary and update the list of the queries that are left and their indices:

repeat_queries = queries
error_index = list(range(len(repeat_queries)))
results = error_index

while error_index:
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        repeat_results = loop.run_until_complete(fetch_all(repeat_queries, loop))
    for i, rr in zip(error_index, repeat_results):
        results[i] = rr
    error_index = [i for i in range(len(results)) if not isinstance(results[i], dict)]
    repeat_queries = [repeat_queries[i] for i in error_index]

However, since the asyncio loop is asynchronous, error_index and repeat_queries updates are executed before run_until_complete() is done, and the loop is continuously running with queries that were already cast in the previous iterations, resulting in an (almost) infinite while loop.

Therefore, my question is:
Is there any way to force some code to be executed after loop.run_until_complete() has finished?
I have seen some similar questions in stackoverflow but I haven't been able to apply any of their answers.


  • I would do this in different way.

    I would run loop inside fetch() with try/except to catch exception and repeate it.

    Because some problems can never give result so while-loop may run forever - so I would rather use for _ in range(3) to try it only three times.

    I would also return url from fetch so it would be easer to get urls which don't give result.

    import aiohttp
    import asyncio
    import ssl
    async def fetch(session, url):
        exception = None
        for number in range(3):  # try only 3 times
                async with session.get(url, ssl=ssl.SSLContext()) as response:
                    data = await response.json()
                    #print('data:', data)
                    return url, data
            except Exception as ex:
                print('[ERROR] {} | {} | {}'.format(url, number+1, ex))
                exception = ex
        return url, exception
    async def fetch_all(urls, loop):
        async with aiohttp.ClientSession(loop=loop) as session:
            return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
    queries = [
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        results = loop.run_until_complete(fetch_all(queries, loop))
        print('--- results ---')
        for url, result in results:
            print('url:', url)
            print('result:', result)
            print('is dict:', isinstance(result, dict))
            print('type:', type(result))


    [ERROR] https://fake.domain/ | 1 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc2c0> [Name or service not known]
    [ERROR] https://fake.domain/ | 2 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc440> [Name or service not known]
    [ERROR] https://fake.domain/ | 3 | Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
    [ERROR] | 1 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('')
    [ERROR] | 2 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('')
    [ERROR] | 3 | 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('')
    --- results ---
    result: {'args': {}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': '', 'User-Agent': 'Python/3.8 aiohttp/3.7.4.post0', 'X-Amzn-Trace-Id': 'Root=1-60e5c00e-45aae85e78277e5122b262c9'}, 'origin': '', 'url': ''}
    is dict: True
    type: <class 'dict'>
    result: 0, message='Attempt to decode JSON with unexpected mimetype: text/html', url=URL('')
    is dict: False
    type: <class 'aiohttp.client_exceptions.ContentTypeError'>
    url: https://fake.domain/
    result: Cannot connect to host fake.domain:443 ssl:<ssl.SSLContext object at 0x7f3902afc9c0> [Name or service not known]
    is dict: False
    type: <class 'aiohttp.client_exceptions.ClientConnectorError'>


    Version which uses your method with looping run_until_complete but I would do all in one for-loop.

    And I would use for _ in range(3) to repeate it only three times.

    This works but previous version seems much simpler.

    import aiohttp
    import asyncio
    import ssl
    async def fetch(session, url):
        async with session.get(url, ssl=ssl.SSLContext()) as response:
            return await response.json()
    async def fetch_all(urls, loop):
        async with aiohttp.ClientSession(loop=loop) as session:
            return await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
    queries = [
    if __name__ == '__main__':
        # you can get it once
        loop = asyncio.get_event_loop()
        # original all queries
        all_queries = queries
        # places for all results  
        all_results = [None] * len(all_queries)
        # selected indexes at start
        indexes = list(range(len(all_queries)))
        for number in range(3):
            # selected queries
            queries = [all_queries[idx] for idx in indexes]
            # selected results
            results = loop.run_until_complete(fetch_all(queries, loop))
            print('\n--- try:', number+1, '--- results:', len(results), '---\n')
            new_indexes = []
            for idx, url, result in zip(indexes, queries, results):
                all_results[idx] = result
                if not isinstance(result, dict):
                print('url:', url)
                print('result:', result)    
                print('is dict:', isinstance(result, dict))
                print('type:', type(result))
            # selected indexes after fitering correct results
            indexes = new_indexes