Search code examples
python-asyncioprocess-pool

asyncio: can a task only start when previous task reach a pre-defined stage?


I am starting with asyncio that I wish to apply to following problem:

  • Data is split in chunks.
  • A chunk is 1st compressed.
  • Then the compressed chunk is written in the file.
  • A single file is used for all chunks, so I need to process them one by one.
with open('my_file', 'w+b') as f:
    for chunk in chunks:
        compress_chunk(ch)
        f.write(ch)

From this context, to run this process faster, as soon as the write step of current iteration starts, could the compress step of next iteration be triggered as well?

Can I do that with asyncio, keeping a similar for loop structure? If yes, could you share some pointers about this?

I am guessing another way to run this in parallel is by using ProcessPoolExecutor and splitting fully the compress phase from the write phase. This means compressing 1st all chunks in different executors.

Only when all chunks are compressed, then starting the writing step . But I would like to investigate the 1st approach with asyncio 1st, if it makes sense.

Thanks in advance for any help. Bests


Solution

  • You can do this with a producer-consumer model. As long as there is one producer and one consumer, you will have the correct order. For your use-case, that's all you'll benefit from. Also, you should use the aioFiles library. Standard file IO will mostly block your main compression/producer thread and you won't see much speedup. Try something like this:

    async def produce(queue, chunks):
        for chunk in chunks:
            compress_chunk(ch)
            await queue.put(i)
    
    
    async def consume(queue):
        with async with aiofiles.open('my_file', 'w') as f:
            while True:
                compressed_chunk = await Q.get()
                await f.write(b'Hello, World!')
                queue.task_done()
    
    
    async def main():
        queue = asyncio.Queue()
    
        producer = asyncio.create_task(producer(queue, chunks))
        consumer = asyncio.create_task(consumer(queue))
    
        # wait for the producer to finish
        await producer
    
        # wait for the consumer to finish processing and cancel it
        await queue.join()
        consumer.cancel()
     
    asyncio.run(main())
    

    https://github.com/Tinche/aiofiles

    Using asyncio.Queue for producer-consumer flow