Search code examples
pythonmultiprocessingpython-asyncioprocess-pool

Asyncio with ProcessPoolExecutor shutdown before finishing all tasks


I wanted to combine ProcessPoolExecutor with asyncio to run my blocking functions in TestClass concurrently. Each task is intended to be run long-time, so i need a working shutdown process to make things smooth after exiting my script. Any ideas where do i need to add error handling for KeyboardInterrupt to smoothly shutdown all tasks and processes? I have searched a lot of topics related and none of them solves my issue to work as i wanted. Hope to get some help! Thanks in advance.

import asyncio
from concurrent.futures import ProcessPoolExecutor


class TestClass:
    def __init__(self) -> None:
        self.value1 = 1
        self.value2 = 2


async def task(loop,executor_processes, i):
    print(f"[TASK {i}] Initializing Abck class")
    new_test = await loop.run_in_executor(executor_processes,TestClass)
    # other async and sync functions contained in TestClass
    print(f"[TASK {i}] Finished")


async def main():
    executor_processes = ProcessPoolExecutor(max_workers=5)

    loop_ = asyncio.get_event_loop()
    tasks = []

    for i in range(1, 100):
        tasks.append(task(loop_, executor_processes, i))

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("ctrl + c")
    finally:
        print('Program finished')

Here are error logs after pressing ctrl + c before all tasks and processes are finished.

Fatal Python error: Fatal Python error: init_import_sizeinit_import_size: : Failed to import the site moduleFailed to import the site module

Python runtime state: Python runtime state: initializedinitialized

Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    main()
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    main()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    addsitepackages(known_paths, [sys.prefix])
    addsitedir(sitedir, known_paths)
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    known_paths = venv(known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    addsitedir(sitedir, known_paths)
    exec(line)
    exec(line)
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 2, in <module>
    exec(line)
  File "<string>", line 1, in <module>
    from contextlib import contextmanager
    from . import abc
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 6, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/abc.py", line 4, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
    addsitepackages(known_paths, [sys.prefix])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
    from contextlib import contextmanager
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 165, in <module>
    exec(line)
  File "<string>", line 1, in <module>
    addsitedir(sitedir, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
    from contextlib import contextmanager
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 5, in <module>
    addpackage(sitedir, name, known_paths)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 160, in addpackage
    f = io.TextIOWrapper(io.open_code(fullname))
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 779, in exec_module
  File "<frozen importlib._bootstrap_external>", line 911, in get_code
  File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
KeyboardInterrupt
    from . import machinery
KeyboardInterrupt
    from functools import wraps
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 438, in <module>
    from collections import deque
    class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 21, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/abc.py", line 85, in __new__
    cls = super().__new__(mcls, name, bases, namespace, **kwargs)
KeyboardInterrupt
    from operator import itemgetter as _itemgetter, eq as _eq
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 779, in exec_module
    _CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 394, in namedtuple
  File "<frozen importlib._bootstrap_external>", line 911, in get_code
  File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
Exception in thread QueueManagerThread:
Traceback (most recent call last):
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    exec(s, namespace)
  File "<string>", line 1, in <module>
ctrl + c
Program finished
KeyboardInterrupt
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 394, in _queue_management_worker
    work_item.future.set_exception(bpe)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7ffed1f2f250 state=cancelled>

Solution

  • Windows Solution

    If you are running on Windows then CTRL-C interrupt handling does not seem to work too well with multiprocessing pools. The following is a bit clumsy but seems to work after a fashion.

    The idea is to initialize each process in the multiprocessing pool with a global variable ctrl_c_entered that is set initially to False. I have completed your class TestClass with a method foo which will be the worker function invoked. It must when invoked:

    1. Test the global flag ctrl_c_entered and if True, immediately return.
    2. Have its own KeyboardInterrupt handler and on such an interrupt it must set the global ctrl_c_entered flag to True and return.
    3. Update: However, the CTRL-C could be entered when the pool process has not transferred control to the worker function. For instance, it could be in the process of grabbing from the input queue the next task to run. In this case there would otherwise be no try/catch for KeyboardInterrupt exceptions in effect. So we needs to set an interrupt handler for the SIGINT interrupt for each process in the pool that will set the ctrl_c_entered flag to True. But this now means that the original, default SIGINT interrupt handler must be temporarily restored in Step 2 above in order to catch KeyboardInterrupt exceptions.

    You also have to let all the submitted asyncio tasks complete. So we set a signal.SIGINT interrupt handler that sets a global ctrl_c_entered flags for the main process to True if CTRL-C has been entered (we do not break out of the asyncio.run(main() statement. Our long-running asyncio tasks must check this ctrl_c_entered flag and terminate if it is set to True.

    import asyncio
    from concurrent.futures import ProcessPoolExecutor
    import signal
    import time
    from functools import wraps
    
    def handle_ctrl_c(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            global ctrl_c_entered
            if not ctrl_c_entered:
                signal.signal(signal.SIGINT, default_sigint_handler) # the default
                try:
                    return func(*args, **kwargs)
                except KeyboardInterrupt:
                    ctrl_c_entered = True
                    return KeyboardInterrupt()
                finally:
                    signal.signal(signal.SIGINT, pool_ctrl_c_handler)
            else:
                return KeyboardInterrupt()
        return wrapper
    
    
    class TestClass:
        def __init__(self) -> None:
            self.value1 = 1
            self.value2 = 2
    
        @handle_ctrl_c
        def foo(self, i):
            time.sleep(1)
            return i ** 2
    
    async def task(loop, executor_processes, i):
        # If this is a long-running task, periodically check running flag and return if set.
        # For example:
        if ctrl_c_entered:
            return KeyboardInterrupt()
        print(f"[TASK {i}] Initializing Abck class")
        new_test = await loop.run_in_executor(executor_processes, TestClass().foo, i)
        # other async and sync functions contained in TestClass
        print(f"[TASK {i}] Finished")
        return new_test
    
    def pool_ctrl_c_handler(*args, **kwargs):
        global ctrl_c_entered
        ctrl_c_entered = True
    
    def init_pool():
        # set global variable for each process in the pool:
        global ctrl_c_entered
        global default_sigint_handler
        ctrl_c_entered = False
        default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
    
    async def main():
        executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
    
        loop_ = asyncio.get_event_loop()
        tasks = []
    
        for i in range(1, 100):
            tasks.append(task(loop_, executor_processes, i))
    
        results = await asyncio.gather(*tasks)
        print(results)
    
    def ctrl_c_handler(*args, **kwargs):
        global ctrl_c_entered
        ctrl_c_entered = True
    
    if __name__ == '__main__':
        ctrl_c_entered = False
        signal.signal(signal.SIGINT, ctrl_c_handler)
        asyncio.run(main())
        print('Program finished')
    

    Prints:

    [TASK 1] Initializing Abck class
    [TASK 2] Initializing Abck class
    [TASK 3] Initializing Abck class
    [TASK 4] Initializing Abck class
    [TASK 5] Initializing Abck class
    [TASK 6] Initializing Abck class
    [TASK 7] Initializing Abck class
    [TASK 8] Initializing Abck class
    [TASK 9] Initializing Abck class
    [TASK 10] Initializing Abck class
    [TASK 11] Initializing Abck class
    [TASK 12] Initializing Abck class
    [TASK 13] Initializing Abck class
    [TASK 14] Initializing Abck class
    [TASK 15] Initializing Abck class
    [TASK 16] Initializing Abck class
    [TASK 17] Initializing Abck class
    [TASK 18] Initializing Abck class
    [TASK 19] Initializing Abck class
    [TASK 1] Finished
    [TASK 2] Finished
    [TASK 3] Finished
    [TASK 4] Finished
    [TASK 5] Finished
    [TASK 6] Finished
    [TASK 7] Finished
    [TASK 9] Finished
    [TASK 8] Finished
    [TASK 10] Finished
    ctrl + c
    ctrl + c
    ctrl + c
    ctrl + c
    ctrl + c
    [TASK 13] Finished
    [TASK 16] Finished
    [TASK 17] Finished
    [TASK 18] Finished
    [TASK 19] Finished
    [TASK 14] Finished
    [TASK 12] Finished
    [TASK 11] Finished
    [TASK 15] Finished
    [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt()]
    

    Linux and Platforms that Use Fork Solution

    This is simpler in that interrupt handling more or less works for multiprocessing pools. The easiest way to handle this is to again have a global running flag initialized for each pool process that the worker function can periodically check and terminate if False. Each pool process will set a CTRL-C handler and set running to False when the user has entered CTRL-C. This will take care of terminating any already-running tasks. The main process can simply handle the KeyboardInterrupt exception:

    import asyncio
    from concurrent.futures import ProcessPoolExecutor
    import signal
    import time
    
    class TestClass:
        def __init__(self) -> None:
            self.value1 = 1
            self.value2 = 2
    
        def foo(self):
            for _ in range(20):
                if not running:
                    return
                time.sleep(.1)
    
    async def task(loop, executor_processes, i):
        print(f"[TASK {i}] Initializing Abck class")
        new_test = await loop.run_in_executor(executor_processes, TestClass().foo)
        # other async and sync functions contained in TestClass
        print(f"[TASK {i}] Finished")
    
    def ctrl_c_handler(*args, **kwargs):
        global running
        running = False
    
    def init_pool():
        global running
        running = True
        signal.signal(signal.SIGINT, ctrl_c_handler)
    
    async def main():
        executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
    
        loop_ = asyncio.get_event_loop()
        tasks = []
    
        for i in range(1, 100):
            tasks.append(task(loop_, executor_processes, i))
    
        await asyncio.gather(*tasks)
    
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("ctrl + c")
    print('Program finished')