I wanted to combine ProcessPoolExecutor with asyncio to run my blocking functions in TestClass concurrently. Each task is intended to be run long-time, so i need a working shutdown process to make things smooth after exiting my script. Any ideas where do i need to add error handling for KeyboardInterrupt to smoothly shutdown all tasks and processes? I have searched a lot of topics related and none of them solves my issue to work as i wanted. Hope to get some help! Thanks in advance.
import asyncio
from concurrent.futures import ProcessPoolExecutor
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
async def task(loop,executor_processes, i):
print(f"[TASK {i}] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes,TestClass)
# other async and sync functions contained in TestClass
print(f"[TASK {i}] Finished")
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
await asyncio.gather(*tasks)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("ctrl + c")
finally:
print('Program finished')
Here are error logs after pressing ctrl + c before all tasks and processes are finished.
Fatal Python error: Fatal Python error: init_import_sizeinit_import_size: : Failed to import the site moduleFailed to import the site module
Python runtime state: Python runtime state: initializedinitialized
Traceback (most recent call last):
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
main()
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
addsitepackages(known_paths, [sys.prefix])
addsitedir(sitedir, known_paths)
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
addsitedir(sitedir, known_paths)
exec(line)
exec(line)
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 2, in <module>
exec(line)
File "<string>", line 1, in <module>
from contextlib import contextmanager
from . import abc
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 6, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/abc.py", line 4, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
from contextlib import contextmanager
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 165, in <module>
exec(line)
File "<string>", line 1, in <module>
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
from contextlib import contextmanager
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 5, in <module>
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 160, in addpackage
f = io.TextIOWrapper(io.open_code(fullname))
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 779, in exec_module
File "<frozen importlib._bootstrap_external>", line 911, in get_code
File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
KeyboardInterrupt
from . import machinery
KeyboardInterrupt
from functools import wraps
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 438, in <module>
from collections import deque
class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 21, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/abc.py", line 85, in __new__
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
KeyboardInterrupt
from operator import itemgetter as _itemgetter, eq as _eq
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 779, in exec_module
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 394, in namedtuple
File "<frozen importlib._bootstrap_external>", line 911, in get_code
File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
Exception in thread QueueManagerThread:
Traceback (most recent call last):
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner
exec(s, namespace)
File "<string>", line 1, in <module>
ctrl + c
Program finished
KeyboardInterrupt
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 394, in _queue_management_worker
work_item.future.set_exception(bpe)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7ffed1f2f250 state=cancelled>
Windows Solution
If you are running on Windows then CTRL-C interrupt handling does not seem to work too well with multiprocessing pools. The following is a bit clumsy but seems to work after a fashion.
The idea is to initialize each process in the multiprocessing pool with a global variable ctrl_c_entered
that is set initially to False
. I have completed your class TestClass
with a method foo
which will be the worker function invoked. It must when invoked:
ctrl_c_entered
and if True, immediately return.ctrl_c_entered
flag to True and return.try/catch
for KeyboardInterrupt exceptions in effect. So we needs to set an interrupt handler for the SIGINT interrupt for each process in the pool that will set the ctrl_c_entered
flag to True
. But this now means that the original, default SIGINT
interrupt handler must be temporarily restored in Step 2 above in order to catch KeyboardInterrupt exceptions.You also have to let all the submitted asyncio tasks complete. So we set a signal.SIGINT
interrupt handler that sets a global ctrl_c_entered
flags for the main process to True
if CTRL-C has been entered (we do not break out of the asyncio.run(main()
statement. Our long-running asyncio tasks must check this ctrl_c_entered
flag and terminate if it is set to True.
import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
from functools import wraps
def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
@handle_ctrl_c
def foo(self, i):
time.sleep(1)
return i ** 2
async def task(loop, executor_processes, i):
# If this is a long-running task, periodically check running flag and return if set.
# For example:
if ctrl_c_entered:
return KeyboardInterrupt()
print(f"[TASK {i}] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes, TestClass().foo, i)
# other async and sync functions contained in TestClass
print(f"[TASK {i}] Finished")
return new_test
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
results = await asyncio.gather(*tasks)
print(results)
def ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
if __name__ == '__main__':
ctrl_c_entered = False
signal.signal(signal.SIGINT, ctrl_c_handler)
asyncio.run(main())
print('Program finished')
Prints:
[TASK 1] Initializing Abck class
[TASK 2] Initializing Abck class
[TASK 3] Initializing Abck class
[TASK 4] Initializing Abck class
[TASK 5] Initializing Abck class
[TASK 6] Initializing Abck class
[TASK 7] Initializing Abck class
[TASK 8] Initializing Abck class
[TASK 9] Initializing Abck class
[TASK 10] Initializing Abck class
[TASK 11] Initializing Abck class
[TASK 12] Initializing Abck class
[TASK 13] Initializing Abck class
[TASK 14] Initializing Abck class
[TASK 15] Initializing Abck class
[TASK 16] Initializing Abck class
[TASK 17] Initializing Abck class
[TASK 18] Initializing Abck class
[TASK 19] Initializing Abck class
[TASK 1] Finished
[TASK 2] Finished
[TASK 3] Finished
[TASK 4] Finished
[TASK 5] Finished
[TASK 6] Finished
[TASK 7] Finished
[TASK 9] Finished
[TASK 8] Finished
[TASK 10] Finished
ctrl + c
ctrl + c
ctrl + c
ctrl + c
ctrl + c
[TASK 13] Finished
[TASK 16] Finished
[TASK 17] Finished
[TASK 18] Finished
[TASK 19] Finished
[TASK 14] Finished
[TASK 12] Finished
[TASK 11] Finished
[TASK 15] Finished
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt()]
Linux and Platforms that Use Fork Solution
This is simpler in that interrupt handling more or less works for multiprocessing pools. The easiest way to handle this is to again have a global running
flag initialized for each pool process that the worker function can periodically check and terminate if False
. Each pool process will set a CTRL-C handler and set running
to False when the user has entered CTRL-C. This will take care of terminating any already-running tasks. The main process can simply handle the KeyboardInterrupt
exception:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
def foo(self):
for _ in range(20):
if not running:
return
time.sleep(.1)
async def task(loop, executor_processes, i):
print(f"[TASK {i}] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes, TestClass().foo)
# other async and sync functions contained in TestClass
print(f"[TASK {i}] Finished")
def ctrl_c_handler(*args, **kwargs):
global running
running = False
def init_pool():
global running
running = True
signal.signal(signal.SIGINT, ctrl_c_handler)
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
await asyncio.gather(*tasks)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("ctrl + c")
print('Program finished')