Search code examples
pythonasynchronouspython-asyncioaiohttp

Backoff to retry only the failing the API call instead of all the API calls with asyncio


I'm extracting information using an API that has rate limitations. I'm doing this in an asynchronous way to speed up the process using asyncio and aiohttp. I'm gathering the calls in bunch of 10s, so I make 10 concurrent calls every time. If I receive a 429 I wait for 2 minutes and retry again... For the retry part, I'm using the backoff decorator.

My problem is that the retry is executed for the 10 calls and not only for the call failing... I'm not sure how to do that:

@backoff.on_exception(backoff.expo,aiohttp.ClientError,max_tries=20,logger=my_logger)
async def get_symbols(size,position):
    async with aiohttp.ClientSession() as session:
        queries = get_tasks(session,size,position)
        responses = await asyncio.gather(*queries)
        print("gathering responses")
        for response in responses:
            if response.status == 429:
                print(response.headers)
                print("Code 429 received waiting for 2 minutes")
                print(response)
                time.sleep(120)
                raise aiohttp.ClientError()
            else:
                query_data = await response.read()

Does anyone have a way of just execute the failing call and not the whole bunch?


Solution

  • There are two issues in your code. First is duplicate sleep -- you probably don't understand how backoff works. Its whole point is to 1) try, 2) sleep exponentially increasing delay if there was an error, 3) retry your function/coroutine for you. Second, is that get_symbols is decorated with backoff, hence obviously it's retried as a whole.

    How to improve?

    1. Decorate individual request function
    2. Let backoff do its "sleeping" job
    3. Let aiohttp do its job by letting it raise for non-200 HTTP repropose codes by setting raise_for_status=True in ClientSession initialiser

    It should look something like the following.

    @backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=20)
    async def send_task(client, params):
        async with client.get('https://python.org/', params=params) as resp:
            return await resp.text()
       
    def get_tasks(client, size, position):
        for params in get_param_list(size, position)
            yield send_task(client, params)   
    
    async def get_symbols(size,position):
        async with aiohttp.ClientSession(raise_for_status=True) as client:
            tasks = get_tasks(session, size, position)
            responses = await asyncio.gather(*tasks)
            for response in responses:
                print(await response.read())