Search code examples
pythonmultithreadingasynchronousmultiprocessing

Hang during queue.join() asynchronously processing a queue


I'm currently using multiprocessing and asyncio to process a huge amount of data. However, my code keeps randomly hanging after it finishes processing a batch of items (200 in my case) and does not do queue.join() to process the next batch.

According to the docs:

Block until all items in the queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

I made sure to call queue.task_done() for all items in every batch of data per worker, yet it still happens. Am I understanding something incorrectly? What am I doing wrong? What can I improve?

Minimal reproducible code:

import asyncio
import logging
import random
from multiprocessing import JoinableQueue, Process

N_PROCESSES = 4
N_WORKERS = 8


def create_queue(zpids: list[int]) -> JoinableQueue:
    queue = JoinableQueue()
    for zpid in zpids:
        queue.put(zpid)
    return queue


async def worker(i_process: int, queue: JoinableQueue):
    # Process items in batch of 200
    query_size = 200
    while True:
        batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
        if not batch:
            break

        logging.info("Faking some tasks...")

        for _ in batch:
            queue.task_done()


async def async_process(i_process: int, queue: JoinableQueue):
    logging.info(f"p:{i_process} - launching workers...")
    workers = [asyncio.create_task(worker(i_process, queue)) for _ in range(N_WORKERS)]
    await asyncio.gather(*workers, return_exceptions=True)


def async_process_wrapper(i_process: int, zpids: JoinableQueue):
    asyncio.run(async_process(i_process, zpids), debug=True)


def start_processes(queue: JoinableQueue):
    for i in range(N_PROCESSES):
        Process(target=async_process_wrapper, args=(i, queue)).start()

    queue.join()


def main():
    data = [random.randrange(1, 1000) for _ in range(200000)]
    my_queue = create_queue(data)
    start_processes(my_queue)


if __name__ == "__main__":
    main()

Solution

  • qsize is not reliable. worker will randomly raise a queue.Empty exception at the end of the queue, causing task_done not to be executed for the number of items successfully fetched up to that point.

    The following change should fix the problem.

    from queue import Empty
    
    
    async def worker(i_process: int, queue: JoinableQueue):
        # Process items in batch of 200
        query_size = 200
        while True:
            # batch = [queue.get(timeout=0.01) for _ in range(min(queue.qsize(), query_size))]
            batch = []
            try:
                for _ in range(query_size):
                    batch.append(queue.get(timeout=0.01))
            except Empty:
                pass
    
            if not batch:
                break
    
            logging.info("Faking some tasks...")
    
            for _ in batch:
                queue.task_done()