I have a list of items that need to be processed and I want to be able to process them in parallel for efficiency. But during the processing of one item I may discover more items that need to be added to the list to be processed.
I've looked at the multiprocessing and concurrent libraries but I couldn't find a feature of a queue of this sort that can be modified during runtime, or after it's been passed to the pool. Is there a solution that meets my desires?
Here's some code that demonstrates what I'm wanting.
i = 0
jobs_to_be_processed = [f'job{(i:=i+1)}' for _ in range(5)]
def process_job(job):
if int(job[-1]) % 3 == 0:
jobs_to_be_processed.append(f'new job{(i:=i+1)}')
# do process job ...
pass
# Add jobs to a pool that allows `jobs_to_be_processed`
# to have jobs added while processing
pool = AsyncJobPool(jobs_to_be_processed)
pool.start()
pool.join()
IIUC you can use asyncio.Queue
where you put your items to be processed, e.g.:
import asyncio
async def worker(queue: asyncio.Queue):
while True:
item = await queue.get()
if item == 'spawn more jobs':
print('Spawning more jobs!')
queue.put_nowait('other1')
queue.put_nowait('other2')
else:
await asyncio.sleep(1)
print(f'Processed job item: {item}')
queue.task_done()
async def main():
q = asyncio.Queue()
# we have pool of 2 workers that work concurrently:
workers = [asyncio.create_task(worker(q)) for i in range(2)]
# initially we have 4 job items (one item spawns 2 more jobs):
for job in ['job1', 'job2', 'spawn more jobs', 'job3']:
q.put_nowait(job)
await q.join()
for w in workers:
w.cancel()
asyncio.run(main())
This prints:
Processed job item: job1
Spawning more jobs!
Processed job item: job2
Processed job item: job3
Processed job item: other1
Processed job item: other2