Search code examples
pythonmultiprocessingqueueparameter-passing

Why Python multiprocessing.Process() passes a queue parameter and read faster than a function passes a queue parameter and read?


In python I have implemented 2 types of queue reads

The difference:

  1. queue is created and executed in the main process
  2. queue is created in the main process and executed by other processes.

But there is a performance difference, I tried to debug it, but I can't see why!

code: queue1.py

import multiprocessing
import time
import cProfile, pstats, io


def put_queue(queue):
    for i in range(500000):
        queue.put(i)

def get_queue(queue):
    pr = cProfile.Profile()
    pr.enable()
    print(queue.qsize())
    while queue.qsize() > 0:
        try:
            queue.get(block=False)
        except:
            pass
    pr.dump_stats("queue1.prof")
    pr.disable()
    s = io.StringIO()
    sortby = "cumtime"
    ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
    ps.print_stats()
    print(s.getvalue())


q1 = multiprocessing.Queue()
t1 = time.time()
put_queue(q1)
t2 = time.time()
print(t2-t1)

t1 = time.time()
p1 = multiprocessing.Process(target=get_queue, args=(q1,))
p1.start()
p1.join()
t2 = time.time()
print(t2-t1)

queue2.py

import multiprocessing
import time
import cProfile, pstats, io


def put_queue(queue):
    for i in range(500000):
        queue.put(i)


def get_queue(queue):
    pr = cProfile.Profile()
    pr.enable()
    print(queue.qsize())
    while queue.qsize() > 0:
        try:
            queue.get(block=False)
        except:
            pass
    pr.dump_stats("queue2.prof")
    pr.disable()
    s = io.StringIO()
    sortby = "cumtime"
    ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
    ps.print_stats()
    print(s.getvalue())


q2 = multiprocessing.Queue()
t1 = time.time()
put_queue(q2)
t2 = time.time()
print(t2 - t1)

t1 = time.time()
get_queue(q2)
t2 = time.time()
print(t2 - t1)

python queue2.py takes longer than queue1.py

I also print profile. enter image description here

queue2.py cost muth time in built-in method posix.read.

I want to know exactly why.


Solution

  • I don't know if I cam tell you "exactly why" you observe what you see but I have what I believe is a fairly good explanation -- even if it doesn't explain all of what you see.

    First, if you read the documentation on multiprocessing.Queue, you will see that the call to method qsize is not reliable and should not be used (if you follow my explanation below of what is occurring you can readily see why this is so). Let's use a simpler benchmark that abides by the documentation:

    from multiprocessing import Queue, Process
    import time
    
    N = 500_000
    
    def putter(queue):
        for i in range(N):
            queue.put(i)
    
    def getter(queue):
        for i in range(N):
            queue.get()
    
    def benchmark1(queue):
        t = time.time()
        putter(queue)
        getter(queue)
        elapsed = time.time() - t
        print('benchmark 1 time:', elapsed)
    
    
    def benchmark2(queue):
        t = time.time()
        putter(queue)
        p = Process(target=getter, args=(queue,))
        p.start()
        p.join()
        elapsed = time.time() - t
        print('benchmark 2 time:', elapsed)
    
    if __name__ == '__main__':
        queue = Queue()
        benchmark1(queue)
        benchmark2(queue)
    

    Prints:

    benchmark 1 time: 19.12191128730774
    benchmark 2 time: 8.261705160140991
    

    Indeed we see that the version of code where getter is running in another process is approximately twice as fast.

    Explanation

    There is clearly a difference in the two cases and I believe the following explains it:

    A multiprocessing queue is built on top of a multiprocessing.Pipe instance, which has a very limited capacity. If you attempt to send data through the pipe without having another thread/process reading on the other end, you will soon block after having sent very little data. Yet we know that if we define a queue with unlimited capacity I can do, as in the above benchmarks, 500_000 put requests without blocking. What magic makes this possible?

    When you create the queue, a collections.deque instance is created as part of the queue's implmentation. When a put is done you are simply appending to a deque with unlimited capacity. In addition, a thread is started that waits for the deque to have data and it is actually this thread that is responsible for sending the data through the pipe. Thus it is this special thread that is blocking while the main thread can continue adding data to the queue (actually, the underlying dequeue) without blocking.

    By time we call the putter function in benchmark1 essentially all the "put" data is still on the dequeue. As the getter function running in the main thread starts to get items from the queue, the other thread can send more data through the pipe. So we are going rapidly back and forth between these two threads with one thread essentially idle. Here is a bit of the sequence. For the sake of simplicity I am assuming that Pipe's capacity is just one item:

    1. The putter thread is blocked on the pipe (not running) waiting for something to get the data from the pipe.
    2. The getter thread gets an item from the queue (actually the underlying pipe) but now the queue is temporarily empty until the putter can take the next item from its dequeue and send it through the pipe. Until that occurs the getter is in a wait state.
    3. The putter "wakes up" being no longer blocked on the pipe and can now send another item through the pipe. But now it blocks again until the getter is dispatched and can get the item from the pipe.

    In reality, the pipe's capacity might be several of these small integer items and so the getter might run and get several of these items from the queue allowing the putter to concurrently put a few more items without blocking. However, much of the queueing logic is Python bytecode that needs to acquire the GIL before it can execute. In the multiprocessing case, benchmark2, the two threads are running in different processes and so there is no GIL contention. This means items can be moved from the deque through the pipe in parallel with the getter retrieving them and I believe this accounts for the timing difference.