I have a computational pipeline that processes hundreds of data files. Currently, it parallelizes the task by having multiple processes simultaneously processing their own files (using snakemake).
I'm rewriting the pipeline to do the computation on GPU, which should be a significant speed-up, but now the parallelization is within the computation of a single file, so they are processed sequentially.
I'm thinking that I can probably save some time by reading the next file to be processed from disk while the current file is being processed on the GPU, and then write the result from the previous iteration to disk while the next iteration is computing.
This seems like something that asyncio was designed for, but I'm too familiar with it. Is that the way I should go, or would it be better and/or easier to implement using the threading package or some other method?
Yes, I thing that You are correct. But beware reading/writing files with asyncio
, there is one trap.
Since file reading is technically an I/O
operation, asyncio should yield with some performance gain. And it will, but first things first.
For quite long time, there was a lack of good aio
library for files in python. Even now the non-cheating option is available only for linux OS (caio
library must be available). Note that Your OS should have native support for asynchronous file operations. The first attempt for python async library for files was aiofiles. This library cheats a little. You can dive deep on that on Your own, but long story short, it uses threads to simulate parallelization of read/write tasks. Since python GIL, this will not result in any performance gain (in fact, performance will drop due to additional overhead with threads management).
Then, the second library that exposes good asynchronous interface for files was aiofile - note the very similar name, it is very confusing. Since 2.0.0
, this library uses caio
and thus uses native linux support for async file operations. You should stick with this library to gain the performance.
Back to original question, You have some freedom of choice to implement that. The most general solution would be to use asyncio.gather
to acquire the cooperative gain.
If You are separating content producer and consumer, then You can cooperatively Nth
producer with N+1 th
consumer:
async def get_content(...) -> Content:
# ...
# return Content(...)
async def process_content(content: Content, ...) -> ProcessedContent:
# ..., here is Your GPU delegation
# return ProcessedContent(...)
async def run_chunk(first_content: Content, ...) -> tuple[ProcessedContent, Content]:
# ...
first_content_processor_coro = process_content(first_content, ...)
second_content_coro = get_content(...)
second_content, first_process_result = await asyncio.gather(second_content_coro, first_content_processor_coro)
return first_process_result, second_content
run_chunk
should do exactly the same thing that You described in a question - process while reading the next file. You can then arrange calls to run_chunk
on Your own, but it is designed to be run sequentially.
However, to gain most performance, I would simply do a choreography on producer-consumer, and then just run whole batch in gather:
async def get_and_process(...):
content = await get_content(...)
return await process_content(content)
async def run_batch(...):
processed_results = await asyncio.gather(*[get_and_process(...) for ... in X])