How to use multiprocessing.Pool.map with very big iterable?
I need to process big file of data using multiprocessing. File size is 100GB+ so its not possible to read file in memory. So I created generator to produce data for multiprocessing.pool.map However it loads entire file to memory.
import multiprocessing
def generate_data():
with open('data.file') as data_file:
for line in data_file:
yield line
def run(data):
process(data)
if __name__ == '__main__':
pool = multiprocessing.Pool()
pool.map(run, generate_data())
After running this code, I recieve Memory Error. Looks like multiprocessing.Pool.map load all generator to Memory
How can I process generate_data() with multiprocessing.pool.map without loading it to memory?
By using a reasonable chunk size with imap, you can reduce the RAM requirement significantly.
No need for an explicit generator as the file handle is itself an iterator.
This was tested using a file comprised of 104,857,600 lines each of which is 1k - in other words, 100GB in total.
Running on macOS, no Python process were observed to exceed 4GB
from multiprocessing import Pool
from subprocess import Popen, PIPE
FILENAME = '/tmp/100GB.txt'
# if the file is very large, this can be slow. You may prefer to provide an estimate
def nlines(filename):
print('Counting lines...', end='', flush=True)
process = Popen(['wc', '-l', filename], stdout=PIPE, stderr=PIPE)
stdout, _ = process.communicate()
print('done')
return int(stdout.decode().split()[0])
# process each line from the file
def process(line):
...
def main():
with Pool() as pool:
with open(FILENAME) as data:
chunksize, extra = divmod(nlines(FILENAME), len(pool._pool) * 4) #type: ignore
if extra:
chunksize += 1
print(f'{chunksize=}')
for _ in pool.imap(process, data, chunksize=chunksize):
...
if __name__ == '__main__':
main()