What's the proper way of aborting multiprocessing when one of the child aborts and/or throw an Exception?
I found various questions around that (generic multiprocessing error handling, how to close multiprocessing pool on exception but without answer, ...), but no clear answer on how to stop multiprocessing on child exception.
For instance, I expect the following code:
def f(x):
sleep(x)
print(f"f({x})")
return 1.0 / (x - 2)
def main():
with Pool(4) as p:
try:
r = p.map(f, range(7))
except Exception as e:
print(f"oops: {e}")
p.close()
p.terminate()
print("end")
if __name__ == '__main__':
main()
To output:
f(0)
f(1)
f(2)
oops: float division by zero
end
Instead, it applies f
function on all items before detecting/handling the exception:
f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end
Isn't there any way to catch the exception directly?
I think you're going to need apply_async
for this, so you can act upon every single result instead of the cumulative result. pool.apply_async
offers an error_callback
parameter you can use to register your error-handler. apply_async
is not blocking, so you'll need to join()
the pool. I'm also using a flag terminated
to know when results can be processed normally in case no exception occured.
from time import sleep
from multiprocessing import Pool
def f(x):
sleep(x)
print(f"f({x})")
return 1.0 / (x - 2)
def on_error(e):
global terminated
terminated = True
pool.terminate()
print(f"oops:{e}")
def main():
global pool
global terminated
terminated = False
pool = Pool(4)
results = [pool.apply_async(f, (x,), error_callback=on_error)
for x in range(7)]
pool.close()
pool.join()
if not terminated:
for r in results:
print(r.get())
print("end")
if __name__ == '__main__':
main()