Search code examples
pythonmultiprocessingpoolrobustness

Multiprocessing Robust to Occasional Failures


I have a 100-1000 timeseries paths and a fairly expensive simulation that I'd like to parallelize. However, the library I'm using hangs on rare occasions and I'd like to make it robust to those issues. This is the current setup:

with Pool() as pool:
    res = pool.map_async(simulation_that_occasionally_hangs, (p for p in paths))
    all_costs = res.get()

I know get() has a timeout parameter but if I understand correctly that works on the whole process of the 1000 paths. What I'd like to do is check if any single simulation is taking longer than 5 minutes (a normal path takes 4 seconds) and if so just stop that path and continue to get() the rest.

EDIT:

Testing timeout in pebble

def fibonacci(n):
    if n == 0: return 0
    elif n == 1: return 1
    else: return fibonacci(n - 1) + fibonacci(n - 2)


def main():
    with ProcessPool() as pool:
        future = pool.map(fibonacci, range(40), timeout=10)
        iterator = future.result()

        all = []
        while True:
            try:
                all.append(next(iterator))
            except StopIteration:
                break
            except TimeoutError as e:
                print(f'function took longer than {e.args[1]} seconds')

        print(all)

Errors:

RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\anaconda3\lib\multiprocessing\spawn.py", line 99, in spawn_main
    new_handle = reduction.steal_handle(parent_pid, pipe_handle)
  File "C:\anaconda3\lib\multiprocessing\reduction.py", line 87, in steal_handle
    _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
PermissionError: [WinError 5] Access is denied

Solution

  • The pebble library has been designed to address these kinds of issues. It handles transparently job timeouts and failures such as C library crashes.

    You can check the documentation examples to see how to use it. It has a similar interface as concurrent.futures.