Where using a multiprocessing queue to communicate between processes, many articles recommend sending a terminate message to the queue. However, if a child process is the producer, if may fail expectedly, leaving the consumer without and notification to expect more messages.
However, the parent process can be notified if a process when a child dies. It seems it should be possible for it to notify a worker thread in this process to quit and not expect more messages. But how?
multiprocessing.Queue.close()
...doesn't notify consumers (Really? Wait? what!)
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
... doesn't let me wait for pending work to complete.
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
# messageQ.close()
messageQ.join_thread() # Wait for worker to complete
... fails because the queue is not yet closed.
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE")
messageQ.close()
messageQ.join_thread() # Wait for worker to complete
... seems like it should work, but fails in the worker with a TypeError exception:
msg = messageQ.get()
File "/usr/lib/python3.7/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
return self._recv(size)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
while !quit:
try:
msg = messageQ.get(block=True, timeout=0.5)
except Empty:
continue
... is terrible in that it unnecessarily demands trading shutdown latency without throttling the CPU.
Full example
import multiprocessing
import threading
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
messageQ = multiprocessing.Queue()
def worker():
try:
while True:
msg = messageQ.get()
print(msg)
if msg=="TERMINATE": return
# messageQ.task_done()
finally:
print("Worker quit")
# messageQ.close() # End thread
# messageQ.join_thread()
thr = threading.Thread(target=worker,
daemon=False) # The work queue is precious.
thr.start()
def onProcessQuit(): # Notify worker that we are done.
messageQ.put("TERMINATE") # Notify worker we are done
messageQ.close() # No more messages
messageQ.join_thread() # Wait for worker to complete
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
runProcess()
If you are concerned about the producer
process not completing normally, then I am not sure what your question is because your code as is should work except for a few corrections: (1) it is missing an import
statement, (2) there is no call to runProcess
and (3) your worker thread is incorrectly a daemon thread (as such it may end up terminating before it has had a chance to process all the messages on the queue).
I would also use as a personal preference (and not a correction) None
as the special sentinel message instead of TERMINATE
and remove some extraneous queue calls that you don't really need (I don't see your explicitly closing the queue accomplishing anything that is necessary).
These are the changes:
def producer(messageQ):
messageQ.put("1")
messageQ.put("2")
messageQ.put("3")
if __name__ == '__main__':
import multiprocessing
import threading
SENTINEL = None
def worker():
try:
while True:
msg = messageQ.get()
if msg is SENTINEL:
return # No need to print the sentinel
print(msg)
finally:
print("Worker quit")
def onProcessQuit(): # Notify worker that we are done.
messageQ.put(SENTINEL) # Notify worker we are done
def runProcess():
proc = multiprocessing.Process(target=producer, args=(messageQ,))
proc.start()
proc.join()
print("runProcess quitting ...")
onProcessQuit()
print("runProcess quitting .. OK")
thr.join()
messageQ = multiprocessing.Queue()
thr = threading.Thread(target=worker) # The work queue is precious.
thr.start()
runProcess()
Prints:
1
2
3
runProcess quitting ...
runProcess quitting .. OK
Worker quit