I'm currently using multiprocessing
and asyncio
to process a huge amount of data. However, my code keeps randomly hanging after it finishes processing a batch of items (200 in my case) and does not do queue.join()
to process the next batch.
According to the docs:
Block until all items in the queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
I made sure to call queue.task_done()
for all items in every batch of data per worker, yet it still happens. Am I understanding something incorrectly? What am I doing wrong? What can I improve?
Minimal reproducible code:
import asyncio
import logging
import random
from multiprocessing import JoinableQueue, Process
N_PROCESSES = 4
N_WORKERS = 8
def create_queue(zpids: list[int]) -> JoinableQueue:
queue = JoinableQueue()
for zpid in zpids:
queue.put(zpid)
return queue
async def worker(i_process: int, queue: JoinableQueue):
# Process items in batch of 200
query_size = 200
while True:
batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
if not batch:
break
logging.info("Faking some tasks...")
for _ in batch:
queue.task_done()
async def async_process(i_process: int, queue: JoinableQueue):
logging.info(f"p:{i_process} - launching workers...")
workers = [asyncio.create_task(worker(i_process, queue)) for _ in range(N_WORKERS)]
await asyncio.gather(*workers, return_exceptions=True)
def async_process_wrapper(i_process: int, zpids: JoinableQueue):
asyncio.run(async_process(i_process, zpids), debug=True)
def start_processes(queue: JoinableQueue):
for i in range(N_PROCESSES):
Process(target=async_process_wrapper, args=(i, queue)).start()
queue.join()
def main():
data = [random.randrange(1, 1000) for _ in range(200000)]
my_queue = create_queue(data)
start_processes(my_queue)
if __name__ == "__main__":
main()
qsize is not reliable. worker
will randomly raise a queue.Empty
exception at the end of the queue, causing task_done
not to be executed for the number of items successfully fetched up to that point.
The following change should fix the problem.
from queue import Empty
async def worker(i_process: int, queue: JoinableQueue):
# Process items in batch of 200
query_size = 200
while True:
# batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
batch = []
try:
for _ in range(query_size):
batch.append(queue.get(timeout=0.01))
except Empty:
pass
if not batch:
break
logging.info("Faking some tasks...")
for _ in batch:
queue.task_done()