Search code examples
pythonmultiprocessingtimeoutpool

How to fork and join multiple subprocesses with a global timeout in Python?


I want to execute some tasks in parallel in multiple subprocesses and time out if the tasks were not completed within some delay.

A first approach consists in forking and joining the subprocesses individually with remaining timeouts computed with respect to the global timeout, like suggested in this answer. It works fine for me.

A second approach, which I want to use here, consists in creating a pool of subprocesses and waiting with the global timeout, like suggested in this answer.

However I have a problem with the second approach: after feeding the pool of subprocesses with tasks that have multiprocessing.Event() objects, waiting for their completion raises this exception:

RuntimeError: Condition objects should only be shared between processes through inheritance

Here is the Python code snippet:

import multiprocessing.pool
import time


class Worker:

    def __init__(self):
        self.event = multiprocessing.Event()  # commenting this removes the RuntimeError

    def work(self, x):
        time.sleep(1)
        return x * 10


if __name__ == "__main__":
    pool_size = 2
    timeout = 5

    with multiprocessing.pool.Pool(pool_size) as pool:
        result = pool.map_async(Worker().work, [4, 5, 2, 7])
        print(result.get(timeout))  # raises the RuntimeError

Solution

  • In the "Programming guidlines" section of the multiprocessing — Process-based parallelism documentation, there is this paragraph:

    Better to inherit than pickle/unpickle

    When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

    So multiprocessing.Event() caused a RuntimeError because it is not pickable, as demonstrated by the following Python code snippet:

    import multiprocessing
    import pickle
    
    pickle.dumps(multiprocessing.Event())
    

    which raises the same exception:

    RuntimeError: Condition objects should only be shared between processes through inheritance

    A solution is to use a proxy object:

    A proxy is an object which refers to a shared object which lives (presumably) in a different process.

    because:

    An important feature of proxy objects is that they are picklable so they can be passed between processes.

    multiprocessing.Manager().Event() creates a shared threading.Event() object and returns a proxy for it, so replacing this line:

    self.event = multiprocessing.Event()
    

    by the following line in the Python code snippet of the question solves the problem:

    self.event = multiprocessing.Manager().Event()