Search code examples
pythonpython-3.xconcurrencysubprocessterminate

ProcessPoolExecutor locks more futures than necessary


I'm using the ProcessPoolExecutor to spawn subprocesses. Currently I'm trying to gracefully exit the script on a KeyboardInterrupt / Ctrl+C.

I'm creating the pool with 2 workers and submit 5 futures. On interrupt I'm trying to cancel every future that has not been executed yet. If I interrupt during the execution of the first two futures, the pool can only cancel two futures, meaning that three are currently running. But I only have two workers and each process runs for 5 seconds. What or why is my future executing?

import subprocess
from concurrent.futures import ProcessPoolExecutor
import signal
import sys


def executecommands(commands):
    # Replace signal handler of parent process, so child processes will ignore terminate signals
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    processpool = ProcessPoolExecutor(1)
    # Restore original handler, so the parent process can handle terminate signals
    signal.signal(signal.SIGINT, original_sigint_handler)
    futures = []
    try:
        for command in commands:
            futures.append(processpool.submit(executecommand, command))

        processpool.shutdown()
        print("Executed commands without interruption")
    except KeyboardInterrupt:
        print("\nAttempting to cancel pending commands..")
        for future in futures:
            if future.cancel():
                print("Cancelled one command")
            else:
                print("One command was already running, couldn't cancel")
        print("Waiting for running processes to finish..")
        processpool.shutdown()
        print("Shutdown complete")
        sys.exit(0)


def executecommand(command):
    # create a subprocess and run it
    print("running command")
    process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    print("finished command")
    return process

if __name__ == '__main__':
    print("lets go")
    commandlist = [['/bin/sleep', '5'], ['/bin/sleep', '5'], ['/bin/sleep', '5'], ['/bin/sleep', '5'], ['/bin/sleep', '5']]
    executecommands(commandlist)

Solution

  • This is a CPython implementation detail, but the only futures that you can cancel are the ones that are not in the "call queue". The call queue contains all the futures that are going to be executed next. Its size is max_workers + EXTRA_QUEUED_CALLS. (EXTRA_QUEUED_CALLS is currently set to 1.)

    In your situation, when your first two futures have started executing, the call queue is filled with the 3 next futures (max_workers is 2 and EXTRA_QUEUED_CALLS is 1). Since you only have 5 futures in total, you can't cancel any of them.

    If you fill your command list with 10 futures with 2 workers, you will be able to cancel the last five futures:

    lets go
    running command
    running command
    ^C
    Attempting to cancel pending commands..
    One command was already running, couldn't cancel
    One command was already running, couldn't cancel
    One command was already running, couldn't cancel
    One command was already running, couldn't cancel
    One command was already running, couldn't cancel
    Cancelled one command
    Cancelled one command
    Cancelled one command
    Cancelled one command
    Cancelled one command
    Waiting for running processes to finish..
    running command
    running command
    finished command
    finished command
    running command
    finished command
    Shutdown complete