Suppose I have a program that looks like this:
jobs = [list_of_values_to_consume_and_act]
with multiprocessing.Pool(8) as pool:
results = pool.map(func, jobs)
And whatever is done in func
can raise an exception due to external circumstances, so I can't prevent an exception from happening.
How will the pool behave on exception?
Will it only terminate the process that raised an exception and let other processes run and consume the jobs?
If yes, will it start another process to pick up the slack?
What about the job being handled by the dead process, will it be 'resubmitted' to the pool?
In any case, how do I 'retrieve' the exception?
try...except
block. Incase an exception is caught, the process
informs the appropriate handler thread in the main process which
passes the exception forward so it can be re-rasied. Whether or not other jobs will execute depends on if the pool is still open. Incase you do not catch this re-raised exception, the main process (or the process that started the pool) will exit, automatically cleaning up open resources like the pool (so no tasks can be executed now since pool closed). But if you catch the exception and let the main process continue running then the pool will not shutdown and other jobs will execute as scheduled.pool.map
in a try...except
block? Do note that
incase one of your jobs do raise an error, then the results of other
successful jobs will become inaccessible as well (because these are
stored after the call to pool.map
completes, but the call never
successfully completed). In such cases, where you need to catch
exceptions of individual jobs, it's better to use pool.imap
or pool.apply_async
Example of catching exception for individual tasks using imap
:
import multiprocessing
import time
def prt(value):
if value == 3:
raise ValueError(f"Error for value {value}")
time.sleep(1)
return value
if __name__ == "__main__":
with multiprocessing.Pool(3) as pool:
jobs = pool.imap(prt, range(1, 10))
results = []
for i in range(10):
try:
result = next(jobs)
except ValueError as e:
print(e)
results.append("N/A") # This means that this individual task was unsuccessful
except StopIteration:
break
else:
results.append(result)
print(results)
Example of catching exception for individual tasks using apply_async
import multiprocessing
import time
def prt(value):
if value == 3:
raise ValueError(f"Error for value {value}")
time.sleep(1)
return value
if __name__ == "__main__":
pool = multiprocessing.Pool(3)
job = [pool.apply_async(prt, (i,)) for i in range(1, 10)]
results = []
for j in job:
try:
results.append(j.get())
except ValueError as e:
print(e)
results.append("N/A")
print(results)