Search code examples
pythonmultiprocessingqueuefreeze

Subprocess in Python fails to terminate when Queue is utilized


First of all, I am very new to multiprocessing. I am trying to implement a simple camera simulator that would generate images in a subprocess and put them in a queue for another subprocess to process them. An initialized camera simulator subprocess (camera) hangs/freezes, when I try to invoke the camera.join() method to finalize the program.

import multiprocessing
import queue
import time
import numpy as np

class CameraSimulator(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
        self.connected = multiprocessing.Event()
        self.acquiring = multiprocessing.Event()

    def run(self):
        while self.connected.is_set():
            while self.acquiring.is_set():
                image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
                try:
                    self.queue.put(image)
                except queue.Full:
                    continue
                time.sleep(0.1)

    def connect(self):
        self.connected.set()
        self.start()

    def disconnect(self):
        self.connected.clear()

    def acquire(self):
        self.acquiring.set()

    def stop(self):
        self.acquiring.clear()

if __name__ == "__main__":

    image_queue = multiprocessing.Queue(maxsize=1000)
    camera = CameraSimulator(image_queue)

    print("Connect camera")
    camera.connect()

    print("Start camera acquisition")
    camera.acquire()
    time.sleep(2)
    print("Stop camera acquisition")
    camera.stop()

    time.sleep(2)

    print("Start camera acquisition again")
    camera.acquire()
    time.sleep(2)
    print("Stop camera acquisition")
    camera.stop()

    print("Disconnect camera")
    camera.disconnect()

    print("Draining queue")
    while not image_queue.empty():
        try:
            image_queue.get_nowait()
        except queue.Empty:
            break
    print("Queue drained")

    camera.join()
    print("Camera process terminated")

It seems that the problem might be that the queue is not empty. That's why I deliberately tried to drain the queue, but the problem still persists. Could it be that calling empty() is not reliable way to check, if a queue is empty due to multithreading/multiprocessing semantics? I could use camera.terminate() to forcefully terminate the subprocess, but that I would assume is not a good practice. Any help would be greatly appreciated!


Solution

  • Python blocks the child process untill the queue is drained, but the queue is "double buffered" (so put is non-blocking), and you are only draining the current queue buffer not the "double buffer", it need to be continuously drained until the child terminates.

    You can just have a thread drain the queue while you are joining the worker process.

    import queue
    import time
    import numpy as np
    import multiprocessing
    import threading
    
    class CameraSimulator(multiprocessing.Process):
        def __init__(self, queue):
            # if an exception happen, don't leave a zombie process
            super().__init__(daemon=True) 
            self.queue = queue
            self.connected = multiprocessing.Event()
            self.acquiring = multiprocessing.Event()
    
    
        def run(self):
            while self.connected.is_set():
                while self.acquiring.is_set():
                    image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
                    try:
                        self.queue.put(image)
                    except queue.Full:
                        continue # <---- BUG! skips the sleep
                    time.sleep(0.1)
    
        def connect(self):
            self.connected.set()
            self.start()
    
        def disconnect(self):
            self.connected.clear()
    
        def acquire(self):
            self.acquiring.set()
    
        def stop(self):
            self.acquiring.clear()
    
    if __name__ == "__main__":
    
        image_queue = multiprocessing.Queue(maxsize=1000)
        camera = CameraSimulator(image_queue)
    
        print("Connect camera")
        camera.connect()
    
        print("Start camera acquisition")
        camera.acquire()
        time.sleep(2)
        print("Stop camera acquisition")
        camera.stop()
    
        time.sleep(2)
    
        print("Start camera acquisition again")
        camera.acquire()
        time.sleep(2)
        print("Stop camera acquisition")
        camera.stop()
    
        print("Disconnect camera")
        camera.disconnect()
    
        print("Draining queue")
        items = []
        drainer_stop = threading.Event()
        def drainer():
            while not drainer_stop.is_set():
                try:
                    items.append(image_queue.get(timeout=0.001))
                except queue.Empty:
                    pass
    
    
        drainer_thread = threading.Thread(target=drainer, args=[], daemon=True)
        drainer_thread.start()
    
        print("joining process")
        camera.join()
        print("Camera process terminated")
    
        drainer_stop.set()
        drainer_thread.join()
    
        print("Queue drained")
        print(f"got {len(items)} items!")
    
    Connect camera
    Start camera acquisition
    Stop camera acquisition
    Start camera acquisition again
    Stop camera acquisition
    Disconnect camera
    Draining queue
    joining process
    Camera process terminated
    Queue drained
    got 38 items!
    

    If you don't want this behavior you can probably just terminate the child process, but the data that is still in the queue could be lost. which i don't think you care about anyway.