I was able to reliably reproduce the error with this simple test case (Python 3.7, linux):
def test_process():
def listener(q):
t = Thread(target=lambda x: print(x.get(True)), args=(q, ), dameon=True)
t.start()
def publisher(q):
q.put('test')
with Manager() as mp:
q = mp.Queue()
a = Process(target=listener, args=(q, ), daemon=True)
a.start()
sleep(2)
b = Process(target=publisher, args=(q, ), daemon=True)
b.start()
a.kill()
b.kill()
I run the test with pytest -k test_process -vv
. I can see it log an error:
exception in thread serving 'Process-2|Thread-1'
... message was ('#RETURN', 'test')
... exception was BrokenPipeError(32, 'Broken Pipe')
Note: you can see the logged error if you use a multiprocessing.get_logger(), this is not a thrown exception.
How can I reliably get from a shared Queue in a thread inside a Process regardless of the ordering of how I start the processes? The test case is convoluted because it emulates a real world case that I am currently working on
From the discussion in the comments:
Ok, if I put a while loop in the thread so that it gets from the queue, it still gives me the error but when I run it with .join(), it doesn't. What gives? I am more likely to be running it like the first scenario as a daemonic thread with a while loop
I already explained the reason, your thread is being killed the moment it spawns because a) it's daemonic and b) the target function that needs to be run exits right after starting the thread.
If you are unable to connect the dots, here's a more concrete picture: after the line t.start()
is executed, there are no more commands to be executed and nothing stopping the child process from quitting. This would not have been the case if the thread you started was daemonic, since then the process would have to wait for it to finish. But because that's not the case, the process quits regardless of the state of the thread.
So how does it relate to a BrokenPipeError
? Well this is because when you did x.get()
form inside a thread, it opened a connection to the manager process. The manager accepted this connection and ran the get()
function on the queue stored within. Since this get()
method call blocks, the manager process waited for data to arrive. The connection created remained open all this while.
After it received the data (when you ran b.start()
) it attempted to communicate back to the process that requested the function to be executed (the one with the thread) on the connection that was open. But because the process had already exited, there was no one on the receiving end. This hence lead to our BrokenPipeError
.
In conclusion, it matters little that our thread is running an infinite loop, if you pull it's cord, it's going to die. That's essentially what happened to our thread here. So, to fix this, you need to wait for the thread to finish, either by setting it as non-daemonic, or by explicitly waiting for it to join, there is simply no other way.