Search code examples
pythonmultithreadingmultiprocessingpython-multiprocessingpython-multithreading

Terminating Subprocesses while they are running and communicating which communicate via queues cleanly


I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this:

Comunication Architecture

I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.

from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
 
 
class Worker(Process):
    # The worker resembles the neural network. It does some calculations and shares
    # the information via the queue.
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
 
    def run(self):
        i = 0
        while True:
            self.queue.put(i)
            i += 1
 
    def stop(self):
        # I used the stop function for trying out some things, like using a joinable 
        # queue and block execution as long as the queue is not empty, which is not 
        # working
        self.queue.put(None)
        self.terminate()
 
 
class Listener(Thread):
    # This class resembles the sentinel thread. It checks in an infinite loop for
    # messages. In the real application I send signals via the signals and slots
    # design pattern to the gui and display the sent information.
 
    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)
 
    def run(self):
        self.worker.start()
        while True:
            data = self.queue.get()
            if data is not None:
                print(data)
            else:
                break
        print("broken")
 
    def stop(self):
        self.worker.stop()
 
 
class System:
    # This class resembles the gui
 
    def __init__(self):
        self.listener = Listener()
 
    def start(self):
        self.listener.start()
 
    def stop(self):
        self.listener.stop()
 
 
if __name__ == "__main__":
    system = System()
    system.start()
    sleep(0.1)
    system.stop()

What is the problem?

As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.

What I have tried so far:

  1. Using a Joinable Queue and join() for each task_done()

  2. Rewriting the SIGTERM signalhandler to wait the queue to be emptied

  3. Using a Joinable Queue and only join() within the SIGTERM signalhandler

The results:

  1. The speed of the processing collapsed greatly, but termination worked properly

  2. and 3. termination does not work the way I implemented it Sometimes it worked, sometimes it did not. So no reliable output and knowledge from this method

An attempt for (3) is the following:

class Worker(Process):
 
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.abort = False
        self.lock = Lock()
        signal(SIGTERM, self.stop)
 
    def run(self):
        i = 0
        while True:
            self.lock.acquire()
            if self.abort:
                break
            else:
                self.queue.put(i)
                i += 1
            self.lock.release()
        exit(0)
 
    def stop(self, sig, frame):
        self.abort = True
        self.queue.put(None)
        self.queue.join()
        exit(0)

Solution

  • There are multiple approaches possible, but if you aim for a compromise between performance and robustness, I'd suggest you use the signal-handler only to set a .running-flag on the worker and let it be checked with while self.running within worker.run(). After the loop breaks, you send the sentinel-value from the worker. This ensures the sentinel-value is always the last value in the queue and all values are read by the listener. Together this layout allows for a graceful shutdown of the worker, while still avoiding more expensive synchronization to check for an exit-condition.

    from multiprocessing import Process, Queue
    from functools import partial
    from threading import Thread
    from time import sleep
    import signal
    
    
    SENTINEL = 'SENTINEL'
    
    
    def sigterm_handler(signum, frame, worker):
        worker.shutdown()
    
    
    def register_sigterm(worker):
        global sigterm_handler
        sigterm_handler = partial(sigterm_handler, worker=worker)
        signal.signal(signal.SIGTERM, sigterm_handler)
    
    
    class Worker(Process):
    
        def __init__(self, queue: Queue):
            Process.__init__(self)
            self.queue = queue
            self.running = False
    
        def run(self):
            register_sigterm(self)
            self.running = True
            i = 0
            while self.running:
                self.queue.put(i)
                i += 1
            self.queue.put(SENTINEL)
    
        def stop(self):  # called by parent
            self.terminate()
    
        def shutdown(self):  # called by child from signal-handler
            self.running = False
    
    
    class Listener(Thread):
    
        def __init__(self):
            Thread.__init__(self)
            self.queue = Queue()
            self.worker = Worker(self.queue)
    
        def run(self):
            self.worker.start()
            for data in iter(self.queue.get, SENTINEL):
                print(data)
    
        def stop(self):
            self.worker.stop()
            self.worker.join()
    
    
    class System:
    
        def __init__(self):
            self.listener = Listener()
    
        def start(self):
            self.listener.start()
    
        def stop(self):
            self.listener.stop()
    
    
    if __name__ == "__main__":
    
        system = System()
        system.start()
        sleep(0.1)
        system.stop()
    

    Consider the following experimental.

    The idea is to monkey-patch a queue-instance in the child that way, that after receipt of SIGTERM, the next time queue.put() is called, the passed value and a specified sentinel-value is send, queue.close() and sys.exit() are called. This allows for a clean shutdown while avoiding repeated flag-checking.

    multiprocessing.Queue() is actually just a method on multiprocessing.context.BaseContext, returning a pre-configured instance of multiprocessing.queues.Queue. To not interfere with it, I went with composition over inheritance. Testing so far implies it works just fine.

    stqueue.py

    import sys
    import time
    import signal
    from functools import partial
    from multiprocessing import current_process as curr_p
    
    
    def _shutdown(self):
        self._xput = self.put
        self.put = self.final_put
    
    
    def _final_put(self, obj):
        self._xput(obj)
        self._xput(self._xsentinel)
        self.close()
        sys.exit(0)
    
    
    def _sigterm_handler(signum, frame, queue):
        print(f"[{time.ctime()}, {curr_p().name}] --- handling signal")
        queue.shutdown()
    
    
    def register_sigterm_queue(queue, sentinel):
        """Monkey-patch queue-instance to shutdown process
        after next call to `queue.put()` upon receipt of SIGTERM.
        """
        queue._xsentinel = sentinel
        queue.shutdown = _shutdown.__get__(queue)
        queue.final_put = _final_put.__get__(queue)
        global _sigterm_handler
        _sigterm_handler = partial(_sigterm_handler, queue=queue)
        signal.signal(signal.SIGTERM, _sigterm_handler)
    

    main.py

    import time
    from threading import Thread
    import multiprocessing as mp
    from multiprocessing import Process, Queue, current_process as curr_p
    
    import numpy as np
    
    from stqueue import register_sigterm_queue
    
    
    SENTINEL = 'SENTINEL'
    
    
    class Worker(Process):
    
        def __init__(self, queue: Queue):
            Process.__init__(self)
            self.queue = queue
    
        def run(self):
            register_sigterm_queue(self.queue, SENTINEL)  # <<<
            while True:
                print(f"[{time.ctime()}, {curr_p().name}] --- starting numpy")
                r = np.sum(
                    np.unique(np.random.randint(0, 2500, 100_000_000))
                )
                print(f"[{time.ctime()}, {curr_p().name}] --- ending numpy")
                self.queue.put(r)
    
        def stop(self):  # called by parent
            self.terminate()
    
    ...
    
    
    if __name__ == "__main__":
    
        import logging
        mp.log_to_stderr(logging.DEBUG)
    
        system = System()
        system.start()
        time.sleep(10)
        print(f"[{time.ctime()}, {curr_p().name}] --- sending signal")
        system.stop()
        print(f"[{time.ctime()}, {curr_p().name}] --- signal send")
    

    Example Output:

    [DEBUG/MainProcess] created semlock with handle 140000699432960
    [DEBUG/MainProcess] created semlock with handle 140000699428864
    [DEBUG/MainProcess] created semlock with handle 140000664752128
    [DEBUG/MainProcess] Queue._after_fork()
    [Sat Oct 24 21:59:58 2020, Worker-1] --- starting numpy
    [DEBUG/Worker-1] recreated blocker with handle 140000699432960
    [DEBUG/Worker-1] recreated blocker with handle 140000699428864
    [DEBUG/Worker-1] recreated blocker with handle 140000664752128
    [DEBUG/Worker-1] Queue._after_fork()
    [INFO/Worker-1] child process calling self.run()
    [DEBUG/Worker-1] Queue._start_thread()
    [DEBUG/Worker-1] doing self._thread.start()
    [DEBUG/Worker-1] starting thread to feed data to pipe
    [DEBUG/Worker-1] ... done self._thread.start()
    [Sat Oct 24 22:00:04 2020, Worker-1] --- ending numpy
    [Sat Oct 24 22:00:04 2020, Worker-1] --- starting numpy
    3123750
    [Sat Oct 24 22:00:08 2020, MainProcess] --- sending signal
    [Sat Oct 24 22:00:10 2020, Worker-1] --- handling signal
    [DEBUG/Worker-1] telling queue thread to quit
    [INFO/Worker-1] process shutting down
    [DEBUG/Worker-1] running all "atexit" finalizers with priority >= 0
    [DEBUG/Worker-1] running the remaining "atexit" finalizers
    [DEBUG/Worker-1] joining queue thread
    [DEBUG/Worker-1] feeder thread got sentinel -- exiting
    [DEBUG/Worker-1] ... queue thread joined
    [INFO/Worker-1] process exiting with exitcode 0
    [Sat Oct 24 22:00:10 2020, Worker-1] --- ending numpy
    3123750
    [Sat Oct 24 22:00:10 2020, MainProcess] --- signal send
    [INFO/MainProcess] process shutting down
    [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
    [DEBUG/MainProcess] running the remaining "atexit" finalizers
    
    Process finished with exit code 0