Search code examples
python-asynciohttpresponseaiohttprate-limiting

How to add time delays from previous response.headers, between creations of tasks that make requests asynchronously (asyncio)?


Background:

I'm using asyncio to make API requests asynchronously because it takes a long time to receive response body but my subscription allows me to request at a faster rate. Response.headers has an attribute 'time_reset' that gives me the earliest time at which a new request would succeed. New calls made prior to that time would result in status code 429 (exceeding rate limit)

Goals:

  1. Asynchrounously make new requests without having to wait until receiving results from the past ones
  2. Set varying time delays for each request, so that new calls are only made after current time's past 'time_reset' (which is updated with each call)

Problem:

I've managed to achieve goal 1 (if manually setting the delays to a fixed number between requests), but failed at goal 2. What I'm doing now is

  1. Set a global variable time_reset whose value can be obtained/modified instantly when a new call is made
  2. Create tasks to retrieve the data in a loop. Between creation of tasks I await some time based on the current value of time_reset
  3. Wrap the tasks in a list to gather in the end.

However, with the code below, it seems no delay time is awaited between requests. I don't know why this happens nor where I'm doing wrong. How should I modify the code (or write something new) to achieve the goals above?

Code:

import asyncio
import aiohttp
import time

async def fetch(url, headers):
    global time_reset
    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            time_reset = int(response.headers.get('time_reset')) # time when next request'd be successful/not exceeding rate limit
            if response.status_code==200:
                data = await response.json()
            else:
                data = None

            return data
        

async def main():
    global time_reset
    time_reset = 0

    request_urls = <a list of urls to call>
    headers = <headers with renewed token>

    tasks = []

    for url in request_urls:
        task = asyncio.create_task(fetch(url, headers))
        tasks.append(task)

        current_time = int(time.time())
        delay = max(0, time_reset - current_time + 0.5) # +0.5 to prevent lags
        await asyncio.sleep(delay) 

    datas = await asyncio.gather(*tasks)

await main() # running in jupyter notebook
        

Related Post:

Python Aiohttp Asyncio: how to create delays between each task Here, top answers set delays to be fixed, instead of self-updating.

Any advice'd be appreciated.


Solution

  • If I understand the implementation of aiohttp, the following things are true:

    • As soon as the expression await request.get() is evaluated, the response headers are available. I see that response.headers is not an awaitable.
    • The response body may possibly take a significant amount of time to access. That's the problem you are trying to solve.
    • The header 'time_reset' indicates the earliest moment at which the server will accept the next request.get call. Multiple request.json calls can go in parallel while you are waiting to call request.get.

    If that's all correct, then you have to separate the logic of checking the request headers from the logic of downloading the request body. That's because you need to inspect the headers before you know how long to wait before making the next request.get call. What you want to parallelize is only the downloading of the request body.

    You must issue the request.get calls in a serial fashion, one after the other, with the proper time delay between them. Since you have multiple Tasks running, you can use an asyncio.Lock object to achieve this.

    The main problem with your implementation is that you have attempted to put the time delay in the wrong place. Instead of between Task creations, it needs to be between request.get calls.

    Here is a possible implementation.

    I have created a small class to act as the "gate" for serializing the calls to request.get. That eliminates the need for the global variable. The function Fetcher.get returns a response object, which your Tasks can then use to download the data.

    Unfortunately time_reset is not a standard header, so I cannot test this code myself. I'm fairly certain there is a big problem with your time logic: the standard Python function time.time() returns the floating point number of seconds since 1/1/1970 (I think), while the header presumably returns a time stamp in some sort of internet format (a formatted string). I just ignored this problem because I don't have enough information to do anything else. Also I don't understand why you are trying to convert all the time values to integers. That would lead to sleep times that contain round-off errors; instead, floating point should just work.

    import asyncio
    import time
    
    class Fetcher:
        def __init__(self):
            self.next_time = 0
            self.lock = asyncio.Lock()
    
        async def get(self, session, url, headers):  # returns a response object
            with self.lock:
                t = time.time()
                if t < self.next_time:
                    await asyncio.sleep(self.next_time - t)
                response = await session.get(url, headers=headers)
                self.next_time = response.headers.get('time_reset')
            return response
    
    async def fetch(fetcher, url, headers):
        async with aiohttp.ClientSession() as session:
            response = await fetcher.get(session, url, headers)
            async with response:
                if response.status_code == 200:
                    data = await response.json()
                else:
                    data = None
        return data
            
    
    async def main():
        request_urls = "<a list of urls to call>"
        headers = "<headers with renewed token>"
    
        tasks = []
        fetcher = Fetcher()
    
        for url in request_urls:
            task = asyncio.create_task(fetch(fetcher, url, headers))
            tasks.append(task)
        datas = await asyncio.gather(*tasks)
    
    await main() # running in jupyter notebook