Search code examples
pythonparallel-processingconcurrent.futuresnumpy-memmap

Why is concurrent.futures holding onto memory when returning np.memmap?


The problem

My application is extracting a list of zip files in memory and writing the data to a temporary file. I then memory map the data in the temp file for use in another function. When I do this in a single process, it works fine, reading the data doesn't affect memory, max RAM is around 40MB. However when I do this using concurrent.futures the RAM goes up to 500MB.

I have looked at this example and I understand I could be submitting the jobs in a nicer way to save memory during processing. But I don't think my issue is related, as I am not running out of memory during processing. The issue I don't understand is why it is holding onto the memory even after the memory maps are returned. Nor do I understand what is in the memory, since doing this in a single process does not load the data in memory.

Can anyone explain what is actually in the memory and why this is different between single and parallel processing?

PS I used memory_profiler for measuring the memory usage

Code

Main code:

def main():
    datadir = './testdata'
    files = os.listdir('./testdata')
    files = [os.path.join(datadir, f) for f in files]
    datalist = download_files(files, multiprocess=False)
    print(len(datalist))
    time.sleep(15)
    del datalist # See here that memory is freed up
    time.sleep(15)

Other functions:

def download_files(filelist, multiprocess=False):
    datalist = []
    if multiprocess:
        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
            returned_future = [executor.submit(extract_file, f) for f in filelist]
        for future in returned_future:
            datalist.append(future.result())
    else:
        for f in filelist:
            datalist.append(extract_file(f))
    return datalist

def extract_file(input_zip):
    buffer = next(iter(extract_zip(input_zip).values()))
    with tempfile.NamedTemporaryFile() as temp_logfile:
        temp_logfile.write(buffer)
        del buffer
        data = memmap(temp_logfile, dtype='float32', shape=(2000000, 4), mode='r')
    return data

def extract_zip(input_zip):
    with ZipFile(input_zip, 'r') as input_zip:
        return {name: input_zip.read(name) for name in input_zip.namelist()}

Helper code for data

I can't share my actual data, but here's some simple code to create files that demonstrate the issue:

for i in range(1, 16):
    outdir = './testdata'
    outfile = 'file_{}.dat'.format(i)
    fp = np.memmap(os.path.join(outdir, outfile), dtype='float32', mode='w+', shape=(2000000, 4))
    fp[:] = np.random.rand(*fp.shape)
    del fp
    with ZipFile(outdir + '/' + outfile[:-4] + '.zip', mode='w', compression=ZIP_DEFLATED) as z:
        z.write(outdir + '/' + outfile, outfile)

Solution

  • The problem is that you're trying to pass an np.memmap between processes, and that doesn't work.

    The simplest solution is to instead pass the filename, and have the child process memmap the same file.


    When you pass an argument to a child process or pool method via multiprocessing, or return a value from one (including doing so indirectly via a ProcessPoolExecutor), it works by calling pickle.dumps on the value, passing the pickle across processes (the details vary, but it doesn't matter whether it's a Pipe or a Queue or something else), and then unpickling the result on the other side.

    A memmap is basically just an mmap object with an ndarray allocated in the mmapped memory.

    And Python doesn't know how to pickle an mmap object. (If you try, you will either get a PicklingError or a BrokenProcessPool error, depending on your Python version.)

    A np.memmap can be pickled, because it's just a subclass of np.ndarray—but pickling and unpickling it actually copies the data and gives you a plain in-memory array. (If you look at data._mmap, it's None.) It would probably be nicer if it gave you an error instead of silently copying all of your data (the pickle-replacement library dill does exactly that: TypeError: can't pickle mmap.mmap objects), but it doesn't.


    It's not impossible to pass the underlying file descriptor between processes—the details are different on every platform, but all of the major platforms have a way to do that. And you could then use the passed fd to build an mmap on the receiving side, then build a memmap out of that. And you could probably even wrap this up in a subclass of np.memmap. But I suspect if that weren't somewhat difficult, someone would have already done it, and in fact it would probably already be part of dill, if not numpy itself.

    Another alternative is to explicitly use the shared memory features of multiprocessing, and allocate the array in shared memory instead of a mmap.

    But the simplest solution is, as I said at the top, to just pass the filename instead of the object, and let each side memmap the same file. This does, unfortunately, mean you can't just use a delete-on-close NamedTemporaryFile (although the way you were using it was already non-portable and wouldn't have worked on Windows the same way it does on Unix), but changing that is still probably less work than the other alternatives.