Today, I've stumbled on some frustrating behavior of multiprocessing.Queue
s.
This is my code:
import multiprocessing
def make_queue(size):
ret = multiprocessing.Queue()
for i in range(size):
ret.put(i)
return ret
test_queue = make_queue(3575)
print(test_queue.qsize())
When I run this code, the process exits normally with exit code 0.
However, when I increase the queue size to 3576 or above, it hangs. When I send SIGINT to it through Ctrl-C, it raises the error here:
Exception ignored in atexit callback: <function _exit_function at 0x7f91104f9360>
Traceback (most recent call last):
File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/util.py", line 360, in _exit_function
_run_finalizers()
File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/util.py", line 300, in _run_finalizers
finalizer()
File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/util.py", line 224, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/multiprocessing/queues.py", line 199, in _finalize_join
thread.join()
File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/threading.py", line 1096, in join
self._wait_for_tstate_lock()
File "/home/captaintrojan/.conda/envs/example_env/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
KeyboardInterrupt:
Can anyone please explain this behavior? I've experimented with the sizes, indeed, from a sample of 40 or so different sizes, any size below or equal to 3575 works fine and any size above 3575 hangs the process. I figured it may have something to do with the queue size in bytes, because if I insert i*i
or some random strings instead of i
, the threshold changes. Note that, unless multiprocessing.Queue
does something suspicious in the background, I don't create any additional processes other than the main process. Also, adding test_queue.close()
has no impact on the outcome.
I understand what happens in the SO post you've linked. That makes sense to me. Queues have limited size
Not exactly, queues are simply a higher-level implementation of pipes that allow multiple readers and writers simultaneously. It is these underlying pipes that have a limited size which is abstracted when implementing queues.
However, I still don't understand why is that, as in, why would anyone design the Queue to behave this way? It makes no sense especially if there is only ever a single process involved
It is more of a design consequence than it is a design choice, and I'll touch more on this later. As to why queues behave this way even when you're transferring data through a single process, it's because queues simply cannot know which process will be the one consuming items. If you're exclusively only going to put and retrieve items from a single process, then you shouldn't be using a multiprocessing.Queue
anyway since it's specially created for inter-process communication (consider using threading.Queue
instead).
Also, as you've said, there is still the anomaly that the process doesn't hang/sleep if the queue is sufficiently small. Perhaps the data is stored in a simple buffer until it gets too large?
The buffer is implemented through a collections.deque
and by default, these have infinite size (only restricted by the physical memory your machine has). The problem happens when the background "feeder" thread attempts to flush the data from the buffer to the pipe (it is after this that the data is actually put in the "queue"). When the pipe becomes full, all calls to put data inside the pipe (through methods send
and equivalent send_bytes
) will hang until the other end of the pipe removes some data (this happens internally when you call queue.get()
). When this deadlock happens, the feeder thread can no longer respond to the exit condition being set as true by the main thread (it does this by putting a sentinel at the end of the buffer to signal the thread to exit when the queue is being garbage collected).
This is important because even though the feeder thread is daemonic, the main thread will attempt to join it by default so the queue can be gracefully closed. And because the feeder thread is stuck attempting to empty the buffer, it will never join until someone does queue.get()
enough times that the buffer can be emptied.
This also explains why the anomaly happens in your case. Basically, if the size of the items you put in a queue is less than or equal to the max size allowed by the pipe, then the feeder thread can empty the buffer right away without waiting for someone to do queue.get()
and deadlocking (so the process will not hang even if you didn't do queue.get()
if the amount of data you put in the queue is small). This is essentially what the answer I linked to was trying to explain.
If it's all still too abstract then you can use a similar version of your code to replicate how the feeder thread gets blocked and what the maximum size of the pipes on your machine are. Hopefully, this helps clear things up a bit:
import multiprocessing
def make_pipe(size):
"""
Function to check the maximum data a pipe can store. Try inputting a high value for arg size.
"""
w, r, = multiprocessing.Pipe()
for i in range(1, size):
print(i)
w.send(i) # If the argument size is high enough, this will eventually deadlock and the process will hang
return w, r
w, r = make_pipe(10000)
print('done')