This Python program:
import concurrent.futures
import multiprocessing
import time
class A:
def __init__(self):
self.event = multiprocessing.Manager().Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
outputs:
processing
processing
processing
KeyboardInterrupt (from main thread):
BrokenPipeError (from pool thread): [WinError 232] The pipe is being closed
Traceback (most recent call last):
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 788, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".\foo.py", line 34, in <module>
a.shutdown()
File ".\foo.py", line 21, in shutdown
self.event.set()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 1067, in set
return self._callmethod('set')
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 792, in _callmethod
self._connect()
File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 779, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 490, in Client
c = PipeClient(address)
File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 691, in PipeClient
_winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] The system cannot find the file specified
when it is run and a SIGINT
signal is sent after three seconds (by pressing Ctrl+C).
Analysis. — The SIGINT
signal is sent to the main thread of each process. In this case there are two processes: the main process and the manager's child process.
SIGINT
signal, the default SIGINT
signal handler raises the KeyboardInterrupt
exception, which is caught and printed.SIGINT
signal, the default SIGINT
signal handler raises a KeyboardInterrupt
exception, which terminates the child process. Consequently all subsequent uses of the manager's shared objects by other processes raise a BrokenPipeError
exception.BrokenPipeError
exception is raised at the line if self.event.is_set():
.a.shutdown()
, which raises the AttributeError
and FileNotFoundError
exceptions.How to prevent this BrokenPipeError
exception?
A solution to this issue is to override the default SIGINT
signal handler with a handler that will ignore the signal, for instance with the signal.SIG_IGN
standard signal handler. It is possible by calling the signal.signal
function at the start of the manager's child process:
import concurrent.futures
import multiprocessing.managers
import signal
import time
def init():
signal.signal(signal.SIGINT, signal.SIG_IGN)
class A:
def __init__(self):
manager = multiprocessing.managers.SyncManager()
manager.start(init)
self.event = manager.Event()
def start(self):
try:
while True:
if self.event.is_set():
break
print("processing")
time.sleep(1)
except BaseException as e:
print(type(e).__name__ + " (from pool thread):", e)
def shutdown(self):
self.event.set()
if __name__ == "__main__":
try:
a = A()
pool = concurrent.futures.ThreadPoolExecutor(1)
future = pool.submit(a.start)
while not future.done():
concurrent.futures.wait([future], timeout=0.1)
except BaseException as e:
print(type(e).__name__ + " (from main thread):", e)
finally:
a.shutdown()
pool.shutdown()
Note. — This program also works with a concurrent.futures.ProcessPoolExecutor
.