Search code examples
pythonpython-asyncio

python asyncio.gather vs asyncio.as_completed when IO task followed by CPU-bound task


I have a program workflow as follows: 1. IO-bound (web page fetch) -> 2. cpu-bound (processing information) -> 3. IO-bound (writing results to database).

I'm presently using aiohttp to fetch the web pages. I am currently using asyncio.as_completed to gather Step 1 tasks and pass them to Step 2 as completed. My concern is that this may be interfering with the completion of Step 1 tasks by consuming cpu resources and blocking program flow in Step 2.

I've tried to use ProcessPoolExecutor to farm out the Step 2 tasks to other processes, but Step 2 tasks uses non-pickleable data structures and functions. I've tried ThreadPoolExecutor, and while that worked (e.g. it didn't crash), it is my understanding that doing so for CPU-bound tasks is counter-productive.

Because the workflow has an intermediate cpu-bound task, would it be more efficient to use asyncio.gather (instead of asyncio.as_completed) to complete all of the Step 1 processes before moving on to Step 2?

Sample asyncio.as_completed code:

async with ClientSession() as session:
    tasks = {self.fetch(session, url) for url in self.urls}
    for task in asyncio.as_completed(tasks):
        raw_data = await asyncio.shield(task)
        data = self.extract_data(*raw_data)
        await self.store_data(data)

Sample asyncio.gather code:

async with ClientSession() as session:
    tasks = {self.fetch(session, url) for url in self.urls}
    results = await asyncio.gather(*tasks)
for result in results:
    data = self.extract_data(*result)
    await self.store_data(data)

Preliminary tests with limited samples show as_completed to be slightly more efficient than gather: ~2.98s (as_completed) vs ~3.15s (gather). But is there an asyncio conceptual issue that would favor one solution over another?


Solution

  • "I've tried ThreadPoolExecutor, [...] it is my understanding that doing so for CPU-bound tasks is counter-productive." - it is countrproductiv in a sense you won't have two such asks running Python code in parallel, using multiple CPU cores - but otherwise, it will work to free up your asyncio Loop to continue working, if only munching code for one task at a time.

    If you can't pickle things to a subprocess, running the CPU bound tasks in a ThreadPoolExecutor is good enough.

    Otherwise, just sprinkle you cpu code with some await asyncio.sleep(0) (inside the loops) and run them normally as coroutines: that is enough for a cpu bound task not to lock the asyncio loop.