Search code examples
pythonerror-handlingconcurrencymultiprocessingfuture

How does Ctrl-C work with multiple processes in python?


I am trying to distribute work across multiple processes. Exceptions that occur in another process should be propagated back and handled in the main process. This seems to work for exceptions thrown in worker, but not for Ctrl-C.

import time
from concurrent.futures import ProcessPoolExecutor, Future, wait
import traceback


def worker():
    time.sleep(5)
    # raise RuntimeError("Some Error")
    return True


def main():
    with ProcessPoolExecutor() as executor:
        stop = False
        tasks = set()

        while True:
            try:
                # keep submitting new tasks
                if not stop:
                    time.sleep(0.1)
                    print("Submitting task to worker")
                    future = executor.submit(worker)
                    tasks.add(future)

                done, not_done = wait(tasks, timeout=0)
                tasks = not_done

                # get the result
                for future in done:
                    try:
                        result = future.result()
                    except Exception as e:
                        print(f"Exception in worker: {type(e).__name__}: {e}")
                    else:
                        print(f"Worker finished with result: {result}")

                # exit loop if there are no tasks left and loop was stopped
                if stop:
                    print(f"Waiting for {len(tasks)} to finish.")
                    if not len(tasks):
                        print("Finished all remaining tasks")
                        break
                    time.sleep(1)
            except KeyboardInterrupt:
                print("Recieved Ctrl-C")
                stop = True
            except Exception as e:
                print(f"Caught {e}")
                stop = True


if __name__ == "__main__":
    main()

Some of my observations:

  • When you run this script and press Ctrl-C, the KeyboardInterrupt exception is thrown multiple times.
  • If you remove the KeyboardInterrupt exception, Ctrl-C is not caught at all. But my understanding is that the second except should catch all exceptions.
  • All exceptions thrown in the worker are reraised by future.result (expected behaviour)

I suspect that if Ctrl-C is pressed while a process is being spawned it could lead to some unexpected behaviour.

Edit: These problems occur on both Linux and Windows. Ideally there is a solution for both, but in case of doubt the solution should work on Linux


Solution

  • It wasn't clear to me whether you want your worker function to continue running until completion (normal or abnormal) ignoring any Ctrl-C events. Assuming that to be the case, the following code should work under both Linux and Windows.

    The idea is to use a "pool initializer", i.e. a function that will run in each pool process prior to executing submitted tasks. Here the initializer executes code to ignore Ctrl-C events (KeyboardInterrupt exceptions).

    Note that I have made a few other code adjustments (marked with comments).

    import time
    from concurrent.futures import ProcessPoolExecutor, Future, wait
    import traceback
    import signal
    
    def init_pool_processes():
        # Ignore Ctrl-C
        signal.signal(signal.SIGINT, signal.SIG_IGN)
    
    
    def worker():
        time.sleep(5)
        # raise RuntimeError("Some Error")
        return True
    
    
    def main():
        # Create pool child processes, which will now
        # ignore Ctrl-C
        with ProcessPoolExecutor(initializer=init_pool_processes) as executor:
            stop = False
            tasks = set()
    
            while True:
                try:
                    # keep submitting new tasks
                    if not stop:
                        print("Submitting task to worker")
                        future = executor.submit(worker)
                        tasks.add(future)
                        # Move to here:
                        time.sleep(0.1)
    
                    done, not_done = wait(tasks, timeout=0)
                    tasks = not_done
    
                    # get the result
                    for future in done:
                        try:
                            result = future.result()
                        except Exception as e:
                            print(f"Exception in worker: {type(e).__name__}: {e}")
                        else:
                            print(f"Worker finished with result: {result}")
    
                    # exit loop if there are no tasks left and loop was stopped
                    if stop:
                        print(f"Waiting for {len(tasks)} to finish.")
                        if not len(tasks):
                            print("Finished all remaining tasks")
                            break
                        time.sleep(1)
                except KeyboardInterrupt:
                    # Ignore future Ctrl-C events:
                    signal.signal(signal.SIGINT, signal.SIG_IGN)
                    print("Received Ctrl-C") # spelling
                    stop = True
                except Exception as e:
                    print(f"Caught {e}")
                    # Ignore future Ctrl-C events:
                    if not stop:
                        signal.signal(signal.SIGINT, signal.SIG_IGN)
                        stop = True
    
    
    if __name__ == "__main__":
        main()