Search code examples
pythonpython-multiprocessing

Behavior of multiprocessing.Pool on exception?


Suppose I have a program that looks like this:

jobs = [list_of_values_to_consume_and_act]
with multiprocessing.Pool(8) as pool:
    results = pool.map(func, jobs)

And whatever is done in func can raise an exception due to external circumstances, so I can't prevent an exception from happening.

How will the pool behave on exception?

  1. Will it only terminate the process that raised an exception and let other processes run and consume the jobs?

  2. If yes, will it start another process to pick up the slack?

  3. What about the job being handled by the dead process, will it be 'resubmitted' to the pool?

  4. In any case, how do I 'retrieve' the exception?


Solution

    1. No processes will be terminated at all. All calls to the target functions from within the pool's processes are wrapped in a try...except block. Incase an exception is caught, the process informs the appropriate handler thread in the main process which passes the exception forward so it can be re-rasied. Whether or not other jobs will execute depends on if the pool is still open. Incase you do not catch this re-raised exception, the main process (or the process that started the pool) will exit, automatically cleaning up open resources like the pool (so no tasks can be executed now since pool closed). But if you catch the exception and let the main process continue running then the pool will not shutdown and other jobs will execute as scheduled.
    2. N/A
    3. The outcome of a job is irrelevant, once it's run once by any process, that job is marked completed and not resubmitted to the pool.
    4. Wrap your call to pool.map in a try...except block? Do note that incase one of your jobs do raise an error, then the results of other successful jobs will become inaccessible as well (because these are stored after the call to pool.map completes, but the call never successfully completed). In such cases, where you need to catch exceptions of individual jobs, it's better to use pool.imap or pool.apply_async

    Example of catching exception for individual tasks using imap:

    import multiprocessing
    import time
    
    
    def prt(value):
        if value == 3:
            raise ValueError(f"Error for value {value}")
        
        time.sleep(1)
        return value
    
    
    if __name__ == "__main__":
        with multiprocessing.Pool(3) as pool:
            jobs = pool.imap(prt, range(1, 10))
            results = []
    
            for i in range(10):
                try:
                    result = next(jobs)
                except ValueError as e:
                    print(e)
                    results.append("N/A")  # This means that this individual task was unsuccessful
                except StopIteration:
                    break
                else:
                    results.append(result)
    
        print(results)
    

    Example of catching exception for individual tasks using apply_async

    import multiprocessing
    import time
    
    
    def prt(value):
        if value == 3:
            raise ValueError(f"Error for value {value}")
    
        time.sleep(1)
        return value
    
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(3)
        job = [pool.apply_async(prt, (i,)) for i in range(1, 10)]
        results = []
        
        for j in job:
            try:
                results.append(j.get())
            except ValueError as e:
                print(e)
                results.append("N/A") 
    
        print(results)