I'm trying to get data from an API that allows to make 4 requests per second. I'd like to use asyncio in order to get the data but I can't seem to find a way to use it properly.
This is what I have so far:
async def fetch_request(self, url):
async with self.rate_limiter:
data = requests.get(url)
return data['items'], data['next']
async def collect_data(self, urls):
data = []
for url in urls:
items, next_cursor = await self.fetch_request(url)
data.append(items)
while next_cursor:
items, next_cursor = await self.fetch_request(url)
data.append(items)
return data
class fetchUrls(self):
loop = asyncio.get_event_loop()
urls = [] #list of urls
self.rate_limiter = RateLimiter(max_calls=4, period=1)
loop.run_until_complete(asyncio.wait(self.collect_data(urls)))
Basically I first generate a list of urls that I want to request. Each of those requests can return (besides data) a cursor for the next page, that's the main problem I'm having to deal with.
I'm using a library https://github.com/RazerM/ratelimiter to handle the rate limiting part.
Right now I'm getting TypeError: expect a list of futures, not coroutine
.
I developed the following library to solve the problem BucketRateLimiter you can check it to solve your issue.
Also you can solve the problem the following way without any 3rd party library:
import asyncio
from typing import Awaitable, Any, NamedTuple
from random import randint
from datetime import datetime
import time
TASKS = [(i, i, i) for i in range(20)] # imagine it is URLs
WORKER_NUM = 1000
class BucketTimeRateLimiter:
def __init__(
self,
max_size: int = 4,
recovery_time: float = 1.0,
rest_time: float = 0.2,
) -> None:
"""
The Bucket is used to limit number of "simultaneous" operations to specified number.
:param max_size: max size of Bucket, max "simultaneous" number of operations
:param recovery_time: time to recover Bucket to full size
:param rest_time: time to give "workers" who use bucket to sleep
"""
self.max_size = max_size
self.active_slots = max_size # number of active slots at the moment
self.recovery_time = recovery_time
self.rest_time = rest_time
# used to signal "external" workers that bucket is "empty"
self.event = asyncio.Event()
def _decrement(self) -> None:
"""Decrements internal counter self.active_slots."""
if self.active_slots > 0:
self.active_slots -= 1
async def _reactivate_slots(self) -> None:
"""Every n seconds (self.recovery_time) refresh number of self.active_slots to max number."""
while True:
await asyncio.sleep(self.recovery_time)
self.active_slots = self.max_size
self.event.set()
async def wrap_operation(self, func: Awaitable, *args: Any, **kwargs: Any) -> Any:
"""Wrapper around some async function. It limits number of "simultaneous" ops."""
while True:
if self.event.is_set(): # if bucket is not empty do work
if self.active_slots == 0:
self.event.clear()
else:
self._decrement()
res = await func(*args, **kwargs)
return res
else:
await asyncio.sleep(self.rest_time)
def activate(self) -> None:
"""The method "activates" our Bucket logic."""
self.event.set() # set event flag that bucket is ready
asyncio.create_task(self._reactivate_slots())
class TestResult(NamedTuple):
"""Result of our "fetch" function """
res: int
start: str
end: str
async def some_func(x: int, y: int, z: int) -> TestResult:
"""Imagine it is a fetch function."""
format_time = "%H:%M:%S"
start = datetime.utcnow().strftime(format_time)
await asyncio.sleep(randint(1, 1)) # wait exactly 1 second :)
end = datetime.utcnow().strftime(format_time)
result = x
return TestResult(result, start, end)
async def worker(q: asyncio.Queue, bucket: BucketTimeRateLimiter) -> None:
"""Workers which do some stuff."""
while True:
item = await q.get()
res: TestResult = await bucket.wrap_operation(some_func, *item)
print(f"Result: {res.res} | {res.start} - {res.end}")
q.task_done()
async def main_entry_point() -> None:
"""Main entry point of our asyncio app."""
q = asyncio.Queue()
bucket = BucketTimeRateLimiter()
for task in TASKS:
await q.put(task) # send all tasks to the Queue
bucket.activate() # run "daemon" task
for w in [worker(q, bucket) for _ in range(1, WORKER_NUM + 1)]:
asyncio.create_task(w) # run as "daemon" task
await q.join() # wait until all task in queue were done
if __name__ == '__main__':
start_t = time.monotonic()
asyncio.run(main_entry_point())
print(f"Time passed: {time.monotonic() - start_t}")