Search code examples
pythonpython-asyncioipython-parallel

Python ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) asyncio analog


When I use ThreadPoolExecutor, I can send a requests batch with limitation of parallel requests like this:

with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as pool:
    results = list(pool.map(request_func, requests_input_data))

How can I repeat this behavior with asyncio? Are there some libraries for this or I should write it by myself with something like "wait for first future has been completed then add a new request"?


Solution

  • Python's asyncio itself has the run_in_executor call, which will run synchronous code inside a ThreadPoolExecutor - then you can have the exact same semantics.

    Otherwise, if you want to add new asynchronous tasks, and keep the running tasks down to a limit, then you have to roll your own code - possibly around an asyncio.Semaphore and the wait call.

    It is not hard to do a working version, good enough to be used - you can then improve the API as much as you want (return exceptions/ignore/raise), return partial results after a timeout, etc...

    One that will run everything before returning can be made with shorter code. Note that the class bellow is mostly boiler plate, but for the 4 lines of core-logic inside the results method.

    import asyncio
    from collections import deque
    
    class AsyncExecutor:
        """Automatic Async Task manager that will limit the number of concurrent tasks started"""
    
        def __init__(self, max_workers=5, debug=False):
            self.loop = asyncio.get_running_loop()
            self.max_workers = max_workers
            self.pending_tasks = deque()
            self.tasks = set()
            self.debug = debug
    
        def submit(self, coro, args=(), kwargs=None):
            if not kwargs: kwargs = {}
            if len(self.tasks) < self.max_workers:
                self.tasks.add(self.loop.create_task(coro(*args, **kwargs)))
            else:
                self.pending_tasks.append((coro, args, kwargs))
    
        def map(self, coro, args_collection=()):
            for args in args_collection:
                self.submit(coro, args)
    
        async def results(self):
            results = []
            while self.pending_tasks or self.tasks:
                if self.debug:
                    print(f"running tasks: {len(self.tasks)}, waiting tasks: {len(self.pending_tasks)}")
                done, in_process = await asyncio.wait(self.tasks, return_when=asyncio.FIRST_COMPLETED)
                self.tasks = in_process
                qtd_new_tasks = max(0, self.max_workers - len(in_process))
                for i in range(qtd_new_tasks):
                    if not self.pending_tasks:
                        break
                    coro, args, kwargs = self.pending_tasks.popleft()
                    self.tasks.add(self.loop.create_task(coro(*args, **kwargs)))
                results.extend(task.result() for task in done)
            return results
    
    
    async def test_task(i):
        await asyncio.sleep(1)
        return i
    
    async def main():
        ex = AsyncExecutor(3, debug=True)
        ex.map(test_task, [(i,) for i in range(10)])
        print(await ex.results())
    
    asyncio.run(main())
    

    Note that this code will avoid creating the tasks - holding the co-routine function callable and arguments to it, as usually done in synchronous code. That is needed because if we'd create the task at once (to keep it as an object in ".pending_tasks"), the asyncio-loop would automatically step through these pending tasks, regardless of they being included in the call to asyncio.wait: each time async code finds an await it will step over all ready tasks. In "real life" these tasks could start an HTTP API transaction, or SQL request - and the target server would be overwhelmed by requests, regardless of we carefully picking only "max_workers" results at a time. The simpler alternative to doing this is indeed using asyncio semaphores (as I noted above, and ended up not needing in this code) - but inside the task code itself.

    For example:

    max_workers = 3
    semaphore = asyncio.Semaphore(max_workers)
    
    async def test_task(i):
        async with semaphore:
            # here, instead of `asyncio.sleep` we'd do an async HTTP request to a server
            await asyncio.sleep(1)
        return i
    
    

    if there are many such co-routines representing a task, one might use a decorator to automatically limit the number of started tasks targeting the same I/O resource.