Search code examples
pythonmultithreadingconcurrencymultiprocessing

An async/parallel approach to working a (potentially) growing task queue


I have a list of items that need to be processed and I want to be able to process them in parallel for efficiency. But during the processing of one item I may discover more items that need to be added to the list to be processed.

I've looked at the multiprocessing and concurrent libraries but I couldn't find a feature of a queue of this sort that can be modified during runtime, or after it's been passed to the pool. Is there a solution that meets my desires?

Here's some code that demonstrates what I'm wanting.

i = 0
jobs_to_be_processed = [f'job{(i:=i+1)}' for _ in range(5)]

def process_job(job):
    if int(job[-1]) % 3 == 0:
        jobs_to_be_processed.append(f'new job{(i:=i+1)}')
    # do process job ...
    pass

# Add jobs to a pool that allows `jobs_to_be_processed`
# to have jobs added while processing
pool = AsyncJobPool(jobs_to_be_processed)
pool.start()
pool.join()

Solution

  • IIUC you can use asyncio.Queue where you put your items to be processed, e.g.:

    import asyncio
    
    
    async def worker(queue: asyncio.Queue):
        while True:
            item = await queue.get()
    
            if item == 'spawn more jobs':
                print('Spawning more jobs!')
                queue.put_nowait('other1')
                queue.put_nowait('other2')
            else:
                await asyncio.sleep(1)
                print(f'Processed job item: {item}')
    
            queue.task_done()
    
    async def main():
        q = asyncio.Queue()
    
        # we have pool of 2 workers that work concurrently:
        workers = [asyncio.create_task(worker(q)) for i in range(2)]
    
        # initially we have 4 job items (one item spawns 2 more jobs):
        for job in ['job1', 'job2', 'spawn more jobs', 'job3']:
            q.put_nowait(job)
    
        await q.join()
    
        for w in workers:
            w.cancel()
    
    asyncio.run(main())
    

    This prints:

    Processed job item: job1
    Spawning more jobs!
    Processed job item: job2
    Processed job item: job3
    Processed job item: other1
    Processed job item: other2