Search code examples
pythonpython-multiprocessingpypy

Why won't Python Multiprocessing Workers die?


I'm using the python multiprocessing functionality to map some function across some elements. Something along the lines of this:

def computeStuff(arguments, globalData, concurrent=True):
    pool = multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))
    results = pool.map(workerFunction, list(enumerate(arguments)))
    return results

def initWorker(globalData):
    workerFunction.globalData = globalData

def workerFunction((index, argument)):
    ... # computation here

Generally I run tests in ipython using both cPython and Pypy. I have noticed that the spawned processes often don't get killed, so they start accumulating, each using a gig of ram. This happens when hitting ctrl-k during a computation, which sends multiprocessing into a big frenzy of confusion. But even when letting computation finish, those processes won't die in Pypy.

According to the documentation, when the pool gets garbage collected, it should call terminate() and kill all the processes. What's happening here? Do I have to explicitly call close()? If yes, is there some sort of context manager that properly manages closing the resources (i.e. processes)?

This is on Mac OS X Yosemite.


Solution

  • PyPy's garbage collection is lazy, so failing to call close means the Pool is cleaned "sometime", but that might not mean "anytime soon".

    Once the Pool is properly closed, the workers exit when they run out of tasks. An easy way to ensure the Pool is closed in pre-3.3 Python is:

    from contextlib import closing
    
    def computeStuff(arguments, globalData, concurrent=True):
        with closing(multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))) as pool:
            return pool.map(workerFunction, enumerate(arguments))
    

    Note: I also removed the explicit conversion to list (pointless, since map will iterate the enumerate iterator for you), and returned the results directly (no need to assign to a name only to return on the next line).

    If you want to ensure immediate termination in the exception case (on pre-3.3 Python), you'd use a try/finally block, or write a simple context manager (which could be reused for other places where you use a Pool):

    from contextlib import contextmanager
    
    @contextmanager
    def terminating(obj):
        try:
            yield obj
        finally:
            obj.terminate()
    
    def computeStuff(arguments, globalData, concurrent=True):
        with terminating(multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))) as pool:
            return pool.map(workerFunction, enumerate(arguments))
    

    The terminating approach is superior in that it guarantees the processes exit immediately; in theory, if you're using threads elsewhere in your main program, the Pool workers might be forked with non-daemon threads, which would keep the processes alive even when the worker task thread exited; terminating hides this by killing the processes forcibly.

    If your interpreter is Python 3.3 or higher, the terminating approach is built-in to Pool, so no special wrapper is needed for the with statement, with multiprocessing.Pool(initializer=initWorker, initargs=(globalData,)) as pool: works directly.