Search code examples
pythonfile-iomultiprocessingpython-multiprocessingcontextmanager

getting OSError: too many open files when running longish test suite on code that uses multiprocessing


Running python 2.7, dev environment is OS X but production is linux.

I've got some code I'm trying to speed up with multiprocessing, and I got it working fine and observed the desired theoretical speedups. I then went to run the test suite on it, and after a few tests, started getting the above OSError on all subsequent tests. If I run the tests from the point where I start getting the error, some number of them pass and then I get that error again. Which is fairly logical, just a sanity check.

To try to figure out what was going wrong, I replaced __builtin__'s open and close calls with ones that print (following advice in https://stackoverflow.com/a/2023709/3543200)

import __builtin__
import traceback
import sys
openfiles = set()
oldfile = __builtin__.file
class newfile(oldfile):
    def __init__(self, *args):
        self.x = args[0]
        print "### OPENING %s ###" % str(self.x)
        traceback.print_stack(limit=20)
        print
        sys.stdout.flush()
        oldfile.__init__(self, *args)
        openfiles.add(self)
    def close(self):
        print "### CLOSING %s ###" % str(self.x)
        oldfile.close(self)
        openfiles.remove(self)
oldopen = __builtin__.open
def newopen(*args):
    return newfile(*args)
__builtin__.file = newfile
__builtin__.open = newopen

and what did I see but hundreds and hundreds of lines of ### OPENING /dev/null ###.

When I do the same thing for the code that accomplishes the same task but without the multiprocessing, I get no such file connections, so it stands to reason that the multiprocessing is at fault here. This is supported by the traceback call, which suggests that the culprit is here:

  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 250, in _bootstrap
    sys.stdin = open(os.devnull)

posting the code of the multiprocessing::process.py::_bootstrap function here, just in case it's helpful:

def _bootstrap(self):
    from . import util
    global _current_process

    try:
        self._children = set()
        self._counter = itertools.count(1)
        try:
            sys.stdin.close()
            sys.stdin = open(os.devnull)
        except (OSError, ValueError):
            pass
        _current_process = self
        util._finalizer_registry.clear()
        util._run_after_forkers()
        util.info('child process calling self.run()')
        try:
            self.run()
            exitcode = 0
        finally:
            util._exit_function()
    except SystemExit, e:
        if not e.args:
            exitcode = 1
        elif isinstance(e.args[0], int):
            exitcode = e.args[0]
        else:
            sys.stderr.write(str(e.args[0]) + '\n')
            sys.stderr.flush()
            exitcode = 1
    except:
        exitcode = 1
        import traceback
        sys.stderr.write('Process %s:\n' % self.name)
        sys.stderr.flush()
        traceback.print_exc()

    util.info('process exiting with exitcode %d' % exitcode)
    return exitcode

And, for what it's worth, I'm invoking multiprocessing with code that looks like this:

num_cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=num_cpus)
num_per_job = len(input_data) / num_cpus + 1
chunks = [input_data[num_per_job*i:num_per_job*(i+1)] for i in range(num_cpus)]
# TODO: ^^^ make this a list of generators
data = pool.map(get_output_from_input, chunks)
return itertools.chain.from_iterable(data)

So, question: is this a bug in multiprocessing, or am I doing something terribly wrong? I would really welcome the excuse to spend the next week digging through the multiprocessing code and figuring out how it works, but I would have trouble convincing the higher-ups that this is a valid use of my time. Appreciate anyone with experience helping out!


Solution

  • You need to close the pools to terminate the child processes and free the pipes used to communicate with them. Do it with contextlib.closing so that you don't have to worry about exceptions skipping the close. closing will close the pool at the end of the with block, including when it is exited with an exception. So, you never need to call close yourself.

    Also, Pool.map chunks its requests so you don't have to do it yourself. I removed that bit of code but the get_output_from_input signature may not be right (it will be called once per input item, not once with a list of input items) so you may need to do some fixups.

    import contextlib
    num_cpus = multiprocessing.cpu_count()
    with contextlib.closing(multiprocessing.Pool(processes=num_cpus)) as pool:
        data = pool.map(get_output_from_input, input_data)
        return itertools.chain.from_iterable(data)