Search code examples
pythonpython-asynciosemaphore

How do I use a Semaphore with asyncio.as_completed in Python?


SETUP: I have a large list (over 100+) of tasks (coroutines) that connect to a REST API database server. The coroutines use a client connection pool. I think that the client connection pool is cutting me off, because I am not able to get all my results. I also think that I could use a Semaphore to limit the concurrent connections to the API server, and get all my results before my script finishes. Here's a minimal example:

q = Queue(-1)
progress = tqdm(total=total_hits)
sem = asyncio.Semaphore(1)
for task in asyncio.as_completed(tasks):
    async with sem:
        res = await task
        q.put_nowait(res["data"])
        progress.update(len(res["data"]))
        while res["links"].get("next", None) is not None:
            res = await client.get_json_async(res["links"]["next"])
            q.put_nowait(res["data"])
            progress.update(len(res["data"]))

PROBLEM: I know that I have 10,000 data points to capture. However, I consistently only capture about half of those. I think it's because the client is limiting my TCP connections to the server.

Any ideas?


Solution

  • You can combine asyncio.Semaphore and asyncio.as_completed the following way:

    import asyncio
    import time
    
    
    async def make_request(url: str, s: asyncio.Semaphore):
        """simulates request"""
        async with s:
            await asyncio.sleep(2)
    
        return f"{url}: {time.monotonic()}"
    
    
    async def amain():
        """main wrapper."""
        s = asyncio.Semaphore(5)
        tasks = [make_request(f"url-{i}" ,s) for i in range(26)]
        for cor in asyncio.as_completed(tasks):
            res = await cor
            print(res)
    
    
    if __name__ == '__main__':
        asyncio.run(amain())
    

    On the other hand if you need to limit number of requests due to API timelimit, you will probably need to use some TimeLimiter from any 3rd party library.

    I am author one one of such libraries BucketRateLimiter and I will be grateful if you use it.