I have a program workflow as follows: 1. IO-bound (web page fetch) -> 2. cpu-bound (processing information) -> 3. IO-bound (writing results to database).
I'm presently using aiohttp to fetch the web pages. I am currently using asyncio.as_completed to gather Step 1 tasks and pass them to Step 2 as completed. My concern is that this may be interfering with the completion of Step 1 tasks by consuming cpu resources and blocking program flow in Step 2.
I've tried to use ProcessPoolExecutor to farm out the Step 2 tasks to other processes, but Step 2 tasks uses non-pickleable data structures and functions. I've tried ThreadPoolExecutor, and while that worked (e.g. it didn't crash), it is my understanding that doing so for CPU-bound tasks is counter-productive.
Because the workflow has an intermediate cpu-bound task, would it be more efficient to use asyncio.gather (instead of asyncio.as_completed) to complete all of the Step 1 processes before moving on to Step 2?
Sample asyncio.as_completed code:
async with ClientSession() as session:
tasks = {self.fetch(session, url) for url in self.urls}
for task in asyncio.as_completed(tasks):
raw_data = await asyncio.shield(task)
data = self.extract_data(*raw_data)
await self.store_data(data)
Sample asyncio.gather code:
async with ClientSession() as session:
tasks = {self.fetch(session, url) for url in self.urls}
results = await asyncio.gather(*tasks)
for result in results:
data = self.extract_data(*result)
await self.store_data(data)
Preliminary tests with limited samples show as_completed to be slightly more efficient than gather: ~2.98s (as_completed) vs ~3.15s (gather). But is there an asyncio conceptual issue that would favor one solution over another?
"I've tried ThreadPoolExecutor, [...] it is my understanding that doing so for CPU-bound tasks is counter-productive." - it is countrproductiv in a sense you won't have two such asks running Python code in parallel, using multiple CPU cores - but otherwise, it will work to free up your asyncio Loop to continue working, if only munching code for one task at a time.
If you can't pickle things to a subprocess, running the CPU bound tasks in a ThreadPoolExecutor is good enough.
Otherwise, just sprinkle you cpu code with some await asyncio.sleep(0)
(inside the loops) and run them normally as coroutines: that is enough for a cpu bound task not to lock the asyncio loop.