First of all, I am very new to multiprocessing. I am trying to implement a simple camera simulator that would generate images in a subprocess and put them in a queue for another subprocess to process them. An initialized camera simulator subprocess (camera) hangs/freezes, when I try to invoke the camera.join() method to finalize the program.
import multiprocessing
import queue
import time
import numpy as np
class CameraSimulator(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
self.connected = multiprocessing.Event()
self.acquiring = multiprocessing.Event()
def run(self):
while self.connected.is_set():
while self.acquiring.is_set():
image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
try:
self.queue.put(image)
except queue.Full:
continue
time.sleep(0.1)
def connect(self):
self.connected.set()
self.start()
def disconnect(self):
self.connected.clear()
def acquire(self):
self.acquiring.set()
def stop(self):
self.acquiring.clear()
if __name__ == "__main__":
image_queue = multiprocessing.Queue(maxsize=1000)
camera = CameraSimulator(image_queue)
print("Connect camera")
camera.connect()
print("Start camera acquisition")
camera.acquire()
time.sleep(2)
print("Stop camera acquisition")
camera.stop()
time.sleep(2)
print("Start camera acquisition again")
camera.acquire()
time.sleep(2)
print("Stop camera acquisition")
camera.stop()
print("Disconnect camera")
camera.disconnect()
print("Draining queue")
while not image_queue.empty():
try:
image_queue.get_nowait()
except queue.Empty:
break
print("Queue drained")
camera.join()
print("Camera process terminated")
It seems that the problem might be that the queue is not empty. That's why I deliberately tried to drain the queue, but the problem still persists. Could it be that calling empty() is not reliable way to check, if a queue is empty due to multithreading/multiprocessing semantics? I could use camera.terminate() to forcefully terminate the subprocess, but that I would assume is not a good practice. Any help would be greatly appreciated!
Python blocks the child process untill the queue is drained, but the queue is "double buffered" (so put
is non-blocking), and you are only draining the current queue buffer not the "double buffer", it need to be continuously drained until the child terminates.
You can just have a thread drain the queue while you are joining the worker process.
import queue
import time
import numpy as np
import multiprocessing
import threading
class CameraSimulator(multiprocessing.Process):
def __init__(self, queue):
# if an exception happen, don't leave a zombie process
super().__init__(daemon=True)
self.queue = queue
self.connected = multiprocessing.Event()
self.acquiring = multiprocessing.Event()
def run(self):
while self.connected.is_set():
while self.acquiring.is_set():
image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
try:
self.queue.put(image)
except queue.Full:
continue # <---- BUG! skips the sleep
time.sleep(0.1)
def connect(self):
self.connected.set()
self.start()
def disconnect(self):
self.connected.clear()
def acquire(self):
self.acquiring.set()
def stop(self):
self.acquiring.clear()
if __name__ == "__main__":
image_queue = multiprocessing.Queue(maxsize=1000)
camera = CameraSimulator(image_queue)
print("Connect camera")
camera.connect()
print("Start camera acquisition")
camera.acquire()
time.sleep(2)
print("Stop camera acquisition")
camera.stop()
time.sleep(2)
print("Start camera acquisition again")
camera.acquire()
time.sleep(2)
print("Stop camera acquisition")
camera.stop()
print("Disconnect camera")
camera.disconnect()
print("Draining queue")
items = []
drainer_stop = threading.Event()
def drainer():
while not drainer_stop.is_set():
try:
items.append(image_queue.get(timeout=0.001))
except queue.Empty:
pass
drainer_thread = threading.Thread(target=drainer, args=[], daemon=True)
drainer_thread.start()
print("joining process")
camera.join()
print("Camera process terminated")
drainer_stop.set()
drainer_thread.join()
print("Queue drained")
print(f"got {len(items)} items!")
Connect camera
Start camera acquisition
Stop camera acquisition
Start camera acquisition again
Stop camera acquisition
Disconnect camera
Draining queue
joining process
Camera process terminated
Queue drained
got 38 items!
If you don't want this behavior you can probably just terminate the child process, but the data that is still in the queue could be lost. which i don't think you care about anyway.