Search code examples
pythonprocessmultiprocessingqueuepython-multiprocessing

How to handle abnormal child process termination?


I'm using python 3.7 and following this documentation. I want to have a process, which should spawn a child process, wait for it to finish a task, and get some info back. I use the following code:

if __name__ == '__main__':
    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    print q.get()
    p.join()

When the child process finishes correctly there is no problem, and it works great, but the problem starts when my child process is terminated before it finished. In this case, my application is hanging on wait.

Giving a timeout to q.get() and p.join() not completely solves the issue, because I want to know immediately that the child process died and not to wait to the timeout.

Another problem is that timeout on q.get() yields an exception, which I prefer to avoid.

Can someone suggest me a more elegant way to overcome those issues?


Solution

  • Queue & Signal

    One possibility would be registering a signal handler and use it to pass a sentinel value. On Unix you could handle SIGCHLD in the parent, but that's not an option in your case. According to the signal module:

    On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK.

    Not sure if killing it through Task-Manager will translate into SIGTERM but you can give it a try.

    For handling SIGTERM you would need to register the signal handler in the child.

    import os
    import sys
    import time
    import signal
    from functools import partial
    from multiprocessing import Process, Queue
    
    SENTINEL = None
    
    
    def _sigterm_handler(signum, frame, queue):
        print("received SIGTERM")
        queue.put(SENTINEL)
        sys.exit()
    
    
    def register_sigterm(queue):
        global _sigterm_handler
        _sigterm_handler = partial(_sigterm_handler, queue=queue)
        signal.signal(signal.SIGTERM, _sigterm_handler)
    
    
    def some_func(q):
        register_sigterm(q)
        print(os.getpid())
        for i in range(30):
            time.sleep(1)
            q.put(f'msg_{i}')
    
    
    if __name__ == '__main__':
    
        q = Queue()
        p = Process(target=some_func, args=(q,))
        p.start()
        for msg in iter(q.get, SENTINEL):
            print(msg)
        p.join()
    

    Example Output:

    12273
    msg_0
    msg_1
    msg_2
    msg_3
    received SIGTERM
    
    Process finished with exit code 0
    

    Queue & Process.is_alive()

    Even if this works with Task-Manager, your use-case sounds like you can't exclude force kills, so I think you're better off with an approach which doesn't rely on signals.

    You can check in a loop if your process p.is_alive(), call queue.get() with a timeout specified and handle the Empty exceptions:

    import os
    import time
    from queue import Empty
    from multiprocessing import Process, Queue
    
    def some_func(q):
        print(os.getpid())
        for i in range(30):
            time.sleep(1)
            q.put(f'msg_{i}')
    
    
    if __name__ == '__main__':
    
        q = Queue()
        p = Process(target=some_func, args=(q,))
        p.start()
    
        while p.is_alive():
            try:
                msg = q.get(timeout=0.1)
            except Empty:
                pass
            else:
                print(msg)
    
        p.join()
    

    It would be also possible to avoid an exception, but I wouldn't recommend this because you don't spend your waiting time "on the queue", hence decreasing the responsiveness:

    while p.is_alive():
        if not q.empty():
            msg = q.get_nowait()
            print(msg)
            time.sleep(0.1)
    

    Pipe & Process.is_alive()

    If you intend to utilize one connection per-child, it would however be possible to use a pipe instead of a queue. It's more performant than a queue (which is mounted on top of a pipe) and you can use multiprocessing.connection.wait (Python 3.3+) to await readiness of multiple objects at once.

    multiprocessing.connection.wait(object_list, timeout=None)

    Wait till an object in object_list is ready. Returns the list of those objects in object_list which are ready. If timeout is a float then the call blocks for at most that many seconds. If timeout is None then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.

    For both Unix and Windows, an object can appear in object_list if it is a readable Connection object; a connected and readable socket.socket object; or the sentinel attribute of a Process object. A connection or socket object is ready when there is data available to be read from it, or the other end has been closed.

    Unix: wait(object_list, timeout) almost equivalent select.select(object_list, [], [], timeout). The difference is that, if select.select() is interrupted by a signal, it can raise OSError with an error number of EINTR, whereas wait() will not.

    Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function WaitForMultipleObjects()) or it can be an object with a fileno() method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles are not waitable handles.)

    You can use this to await the sentinel attribute of the process and the parental end of the pipe concurrently.

    import os
    import time
    from multiprocessing import Process, Pipe
    from multiprocessing.connection import wait
    
    
    def some_func(conn_write):
        print(os.getpid())
        for i in range(30):
            time.sleep(1)
            conn_write.send(f'msg_{i}')
    
    
    if __name__ == '__main__':
    
        conn_read, conn_write = Pipe(duplex=False)
        p = Process(target=some_func, args=(conn_write,))
        p.start()
    
        while p.is_alive():
            wait([p.sentinel, conn_read])  # block-wait until something gets ready
            if conn_read.poll():  # check if something can be received
                print(conn_read.recv())
        p.join()