Search code examples
pythonpython-3.xpython-multiprocessing

early exit from multiprocessing.Pool.map (raise in child process doesn't work)


My reproduction is wrong, as noted in Rugnar's answer. I'm leaving the code mostly as-is as I'm not sure where this falls between clarifying and changing the meaning.

I have some thousands of jobs that I need to run and would like any errors to halt execution immediately. I wrap the task in a try / exceptraise so that I can log the error (without all the multiprocessing/threading noise), then reraise. This does not kill the main process.

What's going on, and how can I get the early exit I'm looking for? sys.exit(1) in the child deadlocks, wrapping the try / exceptraise function in yet another function doesn't work either.

$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_reraise.py", line 5, in f_reraise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_reraise.py", line 14, in <module>
    test_reraise()
  File "mp_reraise.py", line 12, in test_reraise
    p.map(f_reraise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)

mp_reraise.py

import multiprocessing

def f_reraise(*args):
    try:
        raise Exception(args)
    except Exception as e:
        print(e)
        raise

def test_reraise():
    with multiprocessing.Pool() as p:
        p.map(f_reraise, range(10))

test_reraise()

If I don't catch and reraise, execution stops early as expected: [this actually does not stop, as per Rugnar's answer]

$ python3 mp_raise.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_raise.py", line 4, in f_raise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_raise.py", line 10, in <module>
    test_raise()
  File "mp_raise.py", line 8, in test_raise
    p.map(f_raise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)  

mp_raise.py

import multiprocessing

def f_raise(*args):
    # missing print, which would demonstrate that
    # this actually does not stop early
    raise Exception(args)

def test_raise():
    with multiprocessing.Pool() as p:
        p.map(f_raise, range(10))

test_raise()

Solution

  • In your mp_raise.py you dont print anything so you dont see how much jobs were done. I added print and found out that pool sees an exeption of the child only when jobs iterator is exhausted. So it never stop early.

    If you need stop early after exception, try this

    import time
    import multiprocessing as mp
    
    
    def f_reraise(i):
        if abort.is_set():  # cancel job if abort happened
            return
        time.sleep(i / 1000)  # add sleep so jobs are not instant, like in real life
        if abort.is_set():  # probably we need stop job in the middle of execution if abort happened
            return
        print(i)
        try:
            raise Exception(i)
        except Exception as e:
            abort.set()
            print('error:', e)
            raise
    
    
    def init(a):
        global abort
        abort = a
    
    
    def test_reraise():
        _abort = mp.Event()
    
        # jobs should stop being fed to the pool when abort happened
        # so we wrap jobs iterator this way
        def pool_args():
            for i in range(100):
                if not _abort.is_set():
                    yield i
    
        # initializer and init is a way to share event between processes
        # thanks to https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes
        with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
            p.map(f_reraise, pool_args())
    
    
    if __name__ == '__main__':
        test_reraise()