I am starting with asyncio that I wish to apply to following problem:
with open('my_file', 'w+b') as f:
for chunk in chunks:
compress_chunk(ch)
f.write(ch)
From this context, to run this process faster, as soon as the write
step of current iteration starts, could the compress
step of next iteration be triggered as well?
Can I do that with asyncio
, keeping a similar for
loop structure? If yes, could you share some pointers about this?
I am guessing another way to run this in parallel is by using ProcessPoolExecutor
and splitting fully the compress
phase from the write
phase. This means compressing 1st all chunks in different executors.
Only when all chunks are compressed, then starting the writing step .
But I would like to investigate the 1st approach with asyncio
1st, if it makes sense.
Thanks in advance for any help. Bests
You can do this with a producer-consumer model. As long as there is one producer and one consumer, you will have the correct order. For your use-case, that's all you'll benefit from. Also, you should use the aioFiles
library. Standard file IO will mostly block your main compression/producer thread and you won't see much speedup. Try something like this:
async def produce(queue, chunks):
for chunk in chunks:
compress_chunk(ch)
await queue.put(i)
async def consume(queue):
with async with aiofiles.open('my_file', 'w') as f:
while True:
compressed_chunk = await Q.get()
await f.write(b'Hello, World!')
queue.task_done()
async def main():
queue = asyncio.Queue()
producer = asyncio.create_task(producer(queue, chunks))
consumer = asyncio.create_task(consumer(queue))
# wait for the producer to finish
await producer
# wait for the consumer to finish processing and cancel it
await queue.join()
consumer.cancel()
asyncio.run(main())