My application is extracting a list of zip files in memory and writing the data to a temporary file. I then memory map the data in the temp file for use in another function. When I do this in a single process, it works fine, reading the data doesn't affect memory, max RAM is around 40MB. However when I do this using concurrent.futures the RAM goes up to 500MB.
I have looked at this example and I understand I could be submitting the jobs in a nicer way to save memory during processing. But I don't think my issue is related, as I am not running out of memory during processing. The issue I don't understand is why it is holding onto the memory even after the memory maps are returned. Nor do I understand what is in the memory, since doing this in a single process does not load the data in memory.
Can anyone explain what is actually in the memory and why this is different between single and parallel processing?
PS I used memory_profiler
for measuring the memory usage
def main():
datadir = './testdata'
files = os.listdir('./testdata')
files = [os.path.join(datadir, f) for f in files]
datalist = download_files(files, multiprocess=False)
print(len(datalist))
time.sleep(15)
del datalist # See here that memory is freed up
time.sleep(15)
def download_files(filelist, multiprocess=False):
datalist = []
if multiprocess:
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
returned_future = [executor.submit(extract_file, f) for f in filelist]
for future in returned_future:
datalist.append(future.result())
else:
for f in filelist:
datalist.append(extract_file(f))
return datalist
def extract_file(input_zip):
buffer = next(iter(extract_zip(input_zip).values()))
with tempfile.NamedTemporaryFile() as temp_logfile:
temp_logfile.write(buffer)
del buffer
data = memmap(temp_logfile, dtype='float32', shape=(2000000, 4), mode='r')
return data
def extract_zip(input_zip):
with ZipFile(input_zip, 'r') as input_zip:
return {name: input_zip.read(name) for name in input_zip.namelist()}
I can't share my actual data, but here's some simple code to create files that demonstrate the issue:
for i in range(1, 16):
outdir = './testdata'
outfile = 'file_{}.dat'.format(i)
fp = np.memmap(os.path.join(outdir, outfile), dtype='float32', mode='w+', shape=(2000000, 4))
fp[:] = np.random.rand(*fp.shape)
del fp
with ZipFile(outdir + '/' + outfile[:-4] + '.zip', mode='w', compression=ZIP_DEFLATED) as z:
z.write(outdir + '/' + outfile, outfile)
The problem is that you're trying to pass an np.memmap
between processes, and that doesn't work.
The simplest solution is to instead pass the filename, and have the child process memmap
the same file.
When you pass an argument to a child process or pool method via multiprocessing
, or return a value from one (including doing so indirectly via a ProcessPoolExecutor
), it works by calling pickle.dumps
on the value, passing the pickle across processes (the details vary, but it doesn't matter whether it's a Pipe
or a Queue
or something else), and then unpickling the result on the other side.
A memmap
is basically just an mmap
object with an ndarray
allocated in the mmap
ped memory.
And Python doesn't know how to pickle an mmap
object. (If you try, you will either get a PicklingError
or a BrokenProcessPool
error, depending on your Python version.)
A np.memmap
can be pickled, because it's just a subclass of np.ndarray
—but pickling and unpickling it actually copies the data and gives you a plain in-memory array. (If you look at data._mmap
, it's None
.) It would probably be nicer if it gave you an error instead of silently copying all of your data (the pickle-replacement library dill
does exactly that: TypeError: can't pickle mmap.mmap objects
), but it doesn't.
It's not impossible to pass the underlying file descriptor between processes—the details are different on every platform, but all of the major platforms have a way to do that. And you could then use the passed fd to build an mmap
on the receiving side, then build a memmap
out of that. And you could probably even wrap this up in a subclass of np.memmap
. But I suspect if that weren't somewhat difficult, someone would have already done it, and in fact it would probably already be part of dill
, if not numpy
itself.
Another alternative is to explicitly use the shared memory features of multiprocessing
, and allocate the array in shared memory instead of a mmap
.
But the simplest solution is, as I said at the top, to just pass the filename instead of the object, and let each side memmap
the same file. This does, unfortunately, mean you can't just use a delete-on-close NamedTemporaryFile
(although the way you were using it was already non-portable and wouldn't have worked on Windows the same way it does on Unix), but changing that is still probably less work than the other alternatives.