Search code examples
pythonmultiprocessingqueue

Python multiprocessing manager.Queue() with 8 processes is as fast as 1 process


I have a queue with 100.000 elements, which I simply want to clean, one by one using multiple processes and then print the time it took in the end:

import multiprocessing
from datetime import datetime

def worker(queue):
    while not queue.empty():
        try:
            item = queue.get_nowait()
            # Process the item
            print(f"Process {multiprocessing.current_process().name} processing item: {item}")
        except:
            break

def main():
    # Create a queue with 100,000 elements using Manager
    manager = multiprocessing.Manager()
    queue = manager.Queue()
    for i in range(100000):
        queue.put(i)

    # Starttime
    start=datetime.now()

    # Create 8 worker processes
    processes = []
    for _ in range(NR_WORKERS):
        p = multiprocessing.Process(target=worker, args=(queue,))
        processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in processes:
        p.join()

    # Endtime
    print(queue.empty(), "all processes finished in:", datetime.now()-start)

if __name__ == "__main__":
    global NR_WORKERS
    NR_WORKERS = 8
    main()

However, when I execute the code with 8 processes, it is almost same fast as with 1 process (compared 15 to 18seconds) and without the print statement both variants are equally fast (around 4 seconds). How does it come, although I'm using more processes? Is there a way to improve this?


Solution

  • Your code as posted is not the best candidate for multiprocessing, which works best when your worker function (worker in this case) is more CPU-intensive then what it currently is so that the overhead of using multiprocessing is more than offset by the parallelism you achieve.

    But assuming your actual worker function is more CPU-intensive than what you posted, you can achieve a great performance boost by using a multiprocessing.Queue instance instead of a managed queue instance. But if you do so, you do need to be aware that the call to queue.empty() is no longer reliable. Therefore, your main process should put to the queue an additional NR_WORKERS sentinel objects (one for each child process) that cannot be mistaken for actual data and signifies that there is no more data in the queue to be gotten. So when each child process gets one of these sentinel objects, it knows to terminate. In this case None is a suitable sentinel value.

    If you are going to time your run, then start the clock at the beginning of main so that you also measure the non-trivial time it takes to put 100_000 integers to a managed queue. If you do so, the total running time becomes (on my desktop) 19.75 seconds. If using a multiprocessing.Queue instance instead, then the running time is reduced to 2.1 secons:

    import multiprocessing
    from datetime import datetime
    
    def worker(queue):
        while True:
            try:
                item = queue.get_nowait()
                if item is None: # Sentinel
                    break
                # Process the item
                #print(f"Process {multiprocessing.current_process().name} processing item: {item}")
            except:
                break
    
    
    def main():
        # Starttime
        start = datetime.now()
        # Create a queue with 100,000 elements:
        queue = multiprocessing.Queue()
        for i in range(100000):
            queue.put(i)
        # Add sentinel values:
        for _ in range(NR_WORKERS):
            queue.put(None)
    
        # Create NR_WORKERS worker processes
        processes = []
        for _ in range(NR_WORKERS):
            p = multiprocessing.Process(target=worker, args=(queue,))
            processes.append(p)
            p.start()
    
        # Wait for all processes to finish
        for p in processes:
            p.join()
    
        # Endtime
        print("All processes finished in:", datetime.now() - start)
    
    if __name__ == "__main__":
        global NR_WORKERS
        NR_WORKERS = 8
        main()
    

    If you need to have your worker function return a result, I would advise you instead that you should just use a multiprocessing pool (not a bad idea to do this even in your current example). But if you do continue to use explicit multiprocessing.Process child processes, then you will need a second output queue for the results. But be aware that you have no control over the order in which the child processes will be putting their results to the result queue. So if you need to get the results back in submitted order, you will need to pass the index of each item being placed on the input queue and have the child process return that index along with the result. You must also get the results from the result queue before joining the child processes. If you know that each worker function will always successfully return a result, then you can just keep count of the number of results the main process retrieves. Otherwise, as in the example below, if the child process will terminate on an error, then it should always put a sentinel value to the result queue whenever it terminates:

    import multiprocessing
    from datetime import datetime
    
    def worker(input_queue, output_queue):
        while True:
            try:
                item = input_queue.get_nowait()
                if item is None: # Sentinel
                    break
                # Process the item
                index, n = item # unpack
                output_queue.put((index, n ** 2)) # return tuple of input index and result
            except:
                break
    
        output_queue.put(None) # This process has put its final item
    
    def main():
        # Starttime
        start = datetime.now()
        input_queue = multiprocessing.Queue()
        output_queue = multiprocessing.Queue()
        arguments = [
            1, 3, 5, 7, 2, 4, 6, 100, 1_000, 1_000_000
        ]
        for index, n in enumerate(arguments):
            input_queue.put((index, n))
        # Add sentinel values:
        for _ in range(NR_WORKERS):
            input_queue.put(None)
    
        # Create NR_WORKERS worker processes
        processes = []
        for _ in range(NR_WORKERS):
            p = multiprocessing.Process(target=worker, args=(input_queue, output_queue))
            processes.append(p)
            p.start()
    
        # Allocate list for results:
        results = [None] * len(arguments)
    
        sentinels_seen = 0
        while sentinels_seen < NR_WORKERS:
            result = output_queue.get()
            if result is None: # sentinel?
                sentinels_seen += 1
            else:
                index, return_value = result # Unpack tuple
                results[index] = return_value
    
        # Now we can wait for all processes to finish
        for p in processes:
            p.join()
    
        # Endtime
        print("All processes finished in:", datetime.now() - start)
        print(results)
    
    if __name__ == "__main__":
        global NR_WORKERS
        NR_WORKERS = 8
        main()
    

    Prints:

    All processes finished in: 0:00:00.228998
    [1, 9, 25, 49, 4, 16, 36, 10000, 1000000, 1000000000000]
    

    Using a multiprocessing pool:

    import multiprocessing
    from datetime import datetime
    
    def worker(n):
        return n ** 2
    
    def main():
        # Starttime
        start = datetime.now()
        arguments = [
            1, 3, 5, 7, 2, 4, 6, 100, 1_000, 1_000_000
        ]
    
        with multiprocessing.Pool(NR_WORKERS) as pool:
            results = pool.map(worker, arguments)
    
        # Endtime
        print("All processes finished in:", datetime.now() - start)
        print(results)
    
    if __name__ == "__main__":
        global NR_WORKERS
        NR_WORKERS = 8
        main()
    

    Prints:

    All processes finished in: 0:00:00.226000
    [1, 9, 25, 49, 4, 16, 36, 10000, 1000000, 1000000000000]