In python I have implemented 2 types of queue reads
The difference:
But there is a performance difference, I tried to debug it, but I can't see why!
code: queue1.py
import multiprocessing
import time
import cProfile, pstats, io
def put_queue(queue):
for i in range(500000):
queue.put(i)
def get_queue(queue):
pr = cProfile.Profile()
pr.enable()
print(queue.qsize())
while queue.qsize() > 0:
try:
queue.get(block=False)
except:
pass
pr.dump_stats("queue1.prof")
pr.disable()
s = io.StringIO()
sortby = "cumtime"
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
q1 = multiprocessing.Queue()
t1 = time.time()
put_queue(q1)
t2 = time.time()
print(t2-t1)
t1 = time.time()
p1 = multiprocessing.Process(target=get_queue, args=(q1,))
p1.start()
p1.join()
t2 = time.time()
print(t2-t1)
queue2.py
import multiprocessing
import time
import cProfile, pstats, io
def put_queue(queue):
for i in range(500000):
queue.put(i)
def get_queue(queue):
pr = cProfile.Profile()
pr.enable()
print(queue.qsize())
while queue.qsize() > 0:
try:
queue.get(block=False)
except:
pass
pr.dump_stats("queue2.prof")
pr.disable()
s = io.StringIO()
sortby = "cumtime"
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
q2 = multiprocessing.Queue()
t1 = time.time()
put_queue(q2)
t2 = time.time()
print(t2 - t1)
t1 = time.time()
get_queue(q2)
t2 = time.time()
print(t2 - t1)
python queue2.py
takes longer than queue1.py
I also print profile. enter image description here
queue2.py
cost muth time in built-in method posix.read
.
I want to know exactly why.
I don't know if I cam tell you "exactly why" you observe what you see but I have what I believe is a fairly good explanation -- even if it doesn't explain all of what you see.
First, if you read the documentation on multiprocessing.Queue
, you will see that the call to method qsize
is not reliable and should not be used (if you follow my explanation below of what is occurring you can readily see why this is so). Let's use a simpler benchmark that abides by the documentation:
from multiprocessing import Queue, Process
import time
N = 500_000
def putter(queue):
for i in range(N):
queue.put(i)
def getter(queue):
for i in range(N):
queue.get()
def benchmark1(queue):
t = time.time()
putter(queue)
getter(queue)
elapsed = time.time() - t
print('benchmark 1 time:', elapsed)
def benchmark2(queue):
t = time.time()
putter(queue)
p = Process(target=getter, args=(queue,))
p.start()
p.join()
elapsed = time.time() - t
print('benchmark 2 time:', elapsed)
if __name__ == '__main__':
queue = Queue()
benchmark1(queue)
benchmark2(queue)
Prints:
benchmark 1 time: 19.12191128730774
benchmark 2 time: 8.261705160140991
Indeed we see that the version of code where getter
is running in another process is approximately twice as fast.
Explanation
There is clearly a difference in the two cases and I believe the following explains it:
A multiprocessing queue is built on top of a multiprocessing.Pipe
instance, which has a very limited capacity. If you attempt to send data through the pipe without having another thread/process reading on the other end, you will soon block after having sent very little data. Yet we know that if we define a queue with unlimited capacity I can do, as in the above benchmarks, 500_000 put
requests without blocking. What magic makes this possible?
When you create the queue, a collections.deque
instance is created as part of the queue's implmentation. When a put
is done you are simply appending to a deque with unlimited capacity. In addition, a thread is started that waits for the deque to have data and it is actually this thread that is responsible for sending the data through the pipe. Thus it is this special thread that is blocking while the main thread can continue adding data to the queue (actually, the underlying dequeue) without blocking.
By time we call the putter
function in benchmark1
essentially all the "put" data is still on the dequeue. As the getter
function running in the main thread starts to get items from the queue, the other thread can send more data through the pipe. So we are going rapidly back and forth between these two threads with one thread essentially idle. Here is a bit of the sequence. For the sake of simplicity I am assuming that Pipe's capacity is just one item:
putter
thread is blocked on the pipe (not running) waiting for something to get the data from the pipe.getter
thread gets an item from the queue (actually the underlying pipe) but now the queue is temporarily empty until the putter can take the next item from its dequeue and send it through the pipe. Until that occurs the getter is in a wait state.In reality, the pipe's capacity might be several of these small integer items and so the getter might run and get several of these items from the queue allowing the putter to concurrently put a few more items without blocking. However, much of the queueing logic is Python bytecode that needs to acquire the GIL before it can execute. In the multiprocessing case, benchmark2
, the two threads are running in different processes and so there is no GIL contention. This means items can be moved from the deque through the pipe in parallel with the getter retrieving them and I believe this accounts for the timing difference.