Search code examples
python-3.xhttprequestpython-asyncio

Making rate limit requests to cursor paginated api using asyncio


I'm trying to get data from an API that allows to make 4 requests per second. I'd like to use asyncio in order to get the data but I can't seem to find a way to use it properly.

This is what I have so far:

async def fetch_request(self, url):
    async with self.rate_limiter:
        data = requests.get(url)
        return data['items'], data['next']

async def collect_data(self, urls):
    data = []
    for url in urls:
        items, next_cursor = await self.fetch_request(url)
        data.append(items)
        
        while next_cursor:
            items, next_cursor = await self.fetch_request(url)
            data.append(items)
    
    return data

class fetchUrls(self):
    loop = asyncio.get_event_loop()

    urls = [] #list of urls
    self.rate_limiter = RateLimiter(max_calls=4, period=1)
    loop.run_until_complete(asyncio.wait(self.collect_data(urls)))

Basically I first generate a list of urls that I want to request. Each of those requests can return (besides data) a cursor for the next page, that's the main problem I'm having to deal with.

I'm using a library https://github.com/RazerM/ratelimiter to handle the rate limiting part.

Right now I'm getting TypeError: expect a list of futures, not coroutine.


Solution

  • I developed the following library to solve the problem BucketRateLimiter you can check it to solve your issue.

    Also you can solve the problem the following way without any 3rd party library:

    import asyncio
    from typing import Awaitable, Any, NamedTuple
    from random import randint
    from datetime import datetime
    import time
    
    
    TASKS = [(i, i, i) for i in range(20)]  # imagine it is URLs
    WORKER_NUM = 1000
    
    
    class BucketTimeRateLimiter:
        def __init__(
                self,
                max_size: int = 4,
                recovery_time: float = 1.0,
                rest_time: float = 0.2,
        ) -> None:
            """
            The Bucket is used to limit number of "simultaneous" operations to specified number.
            :param max_size: max size of Bucket, max "simultaneous" number of operations
            :param recovery_time: time to recover Bucket to full size
            :param rest_time: time to give "workers" who use bucket to sleep
            """
            self.max_size = max_size
            self.active_slots = max_size  # number of active slots at the moment
            self.recovery_time = recovery_time
            self.rest_time = rest_time
            # used to signal "external" workers that bucket is "empty"
            self.event = asyncio.Event()
    
        def _decrement(self) -> None:
            """Decrements internal counter self.active_slots."""
            if self.active_slots > 0:
                self.active_slots -= 1
    
        async def _reactivate_slots(self) -> None:
            """Every n seconds (self.recovery_time) refresh number of self.active_slots to max number."""
            while True:
                await asyncio.sleep(self.recovery_time)
                self.active_slots = self.max_size
                self.event.set()
    
        async def wrap_operation(self, func: Awaitable, *args: Any, **kwargs: Any) -> Any:
            """Wrapper around some async function. It limits number of "simultaneous" ops."""
            while True:
                if self.event.is_set():  # if bucket is not empty do work
                    if self.active_slots == 0:
                        self.event.clear()
                    else:
                        self._decrement()
                        res = await func(*args, **kwargs)
                        return res
                else:
                    await asyncio.sleep(self.rest_time)
    
        def activate(self) -> None:
            """The method "activates" our Bucket logic."""
            self.event.set()  # set event flag that bucket is ready
            asyncio.create_task(self._reactivate_slots())
    
    
    class TestResult(NamedTuple):
        """Result of our "fetch" function """
        res: int
        start: str
        end: str
    
    
    async def some_func(x: int, y: int, z: int) -> TestResult:
        """Imagine it is a fetch function."""
        format_time = "%H:%M:%S"
        start = datetime.utcnow().strftime(format_time)
        await asyncio.sleep(randint(1, 1))  # wait exactly 1 second :)
        end = datetime.utcnow().strftime(format_time)
        result = x
    
        return TestResult(result, start, end)
    
    
    async def worker(q: asyncio.Queue, bucket: BucketTimeRateLimiter) -> None:
        """Workers which do some stuff."""
        while True:
            item = await q.get()
            res: TestResult = await bucket.wrap_operation(some_func, *item)
            print(f"Result: {res.res} | {res.start} - {res.end}")
            q.task_done()
    
    
    async def main_entry_point() -> None:
        """Main entry point of our asyncio app."""
        q = asyncio.Queue()
        bucket = BucketTimeRateLimiter()
        for task in TASKS:
            await q.put(task)  # send all tasks to the Queue
    
        bucket.activate()  # run "daemon" task
        for w in [worker(q, bucket) for _ in range(1, WORKER_NUM + 1)]:
            asyncio.create_task(w)  # run as "daemon" task
    
        await q.join()  # wait until all task in queue were done
    
    if __name__ == '__main__':
        start_t = time.monotonic()
        asyncio.run(main_entry_point())
        print(f"Time passed: {time.monotonic() - start_t}")