Search code examples
pythonpython-multiprocessingpython-multithreading

Cancel an ProcessPoolExecutor future that has hung


I have python function which is calling into a C library I cannot control or update. Unfortunately, there is an intermittent bug with the C library and occasionally it hangs. To protect my application from also hanging I'm trying to isolate the function call in ThreadPoolExecutor or ProcessPoolExecutor so only that thread or process crashes.

However, the following code hangs, because the executor cannot shut down because the process is still running!

Is it possible to cancel an executor with a future that has hung?

import time
from concurrent.futures import ThreadPoolExecutor, wait

if __name__ == "__main__":
    def hang_forever(*args):
        print("Starting hang_forever")
        time.sleep(10.0)
        print("Finishing hang_forever")

    print("Starting executor")
    with ThreadPoolExecutor() as executor:
        future = executor.submit(hang_forever)
        print("Submitted future")
        done, not_done = wait([future], timeout=1.0)
        print("Done", done, "Not done", not_done)
        # with never exits because future has hung!
        if len(not_done) > 0:
            raise IOError("Timeout")

Solution

  • The docs say that it's not possible to shut down the executor until all pending futures are done executing:

    Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

    Calling future.cancel() won't help as it will also hang. Fortunately, you can solve your problem by using multiprocessing.Process directly instead of using ProcessPoolExecutor:

    import time
    from multiprocessing import Process
    
    
    def hang_forever():
    
        while True:
            print('hang forever...')
            time.sleep(1)
    
    
    def main():
        proc = Process(target=hang_forever)
    
        print('start the process')
        proc.start()
    
        time.sleep(1)
    
        timeout = 3
        print(f'trying to join the process in {timeout} sec...')
        proc.join(timeout)
    
        if proc.is_alive():
            print('timeout is exceeded, terminate the process!')
            proc.terminate()
            proc.join()
    
        print('done')
    
    
    if __name__ == '__main__':
        main()
    

    Output:

    start the process
    hang forever...
    trying to join the process in 3 sec...
    hang forever...
    hang forever...
    hang forever...
    hang forever...
    timeout is exceeded, terminate the process!
    done