Search code examples
pythondictionarymultiprocessingbigdatagenerator

How can I use multiprocessing.Pool.map to process a 100GB+ file with a generator in Python?


How to use multiprocessing.Pool.map with very big iterable?

I need to process big file of data using multiprocessing. File size is 100GB+ so its not possible to read file in memory. So I created generator to produce data for multiprocessing.pool.map However it loads entire file to memory.

import multiprocessing

def generate_data():
    with open('data.file') as data_file:
        for line in data_file:
            yield line


def run(data):
    process(data)
    
if __name__ == '__main__':
    pool = multiprocessing.Pool()
    pool.map(run, generate_data())

After running this code, I recieve Memory Error. Looks like multiprocessing.Pool.map load all generator to Memory

How can I process generate_data() with multiprocessing.pool.map without loading it to memory?


Solution

  • By using a reasonable chunk size with imap, you can reduce the RAM requirement significantly.

    No need for an explicit generator as the file handle is itself an iterator.

    This was tested using a file comprised of 104,857,600 lines each of which is 1k - in other words, 100GB in total.

    Running on macOS, no Python process were observed to exceed 4GB

    from multiprocessing import Pool
    from subprocess import Popen, PIPE
    
    FILENAME = '/tmp/100GB.txt'
    
    # if the file is very large, this can be slow. You may prefer to provide an estimate
    def nlines(filename):
        print('Counting lines...', end='', flush=True)
        process = Popen(['wc', '-l', filename], stdout=PIPE, stderr=PIPE)
        stdout, _ = process.communicate()
        print('done')
        return int(stdout.decode().split()[0])
    
    # process each line from the file
    def process(line):
        ...
    
    def main():
        with Pool() as pool:
            with open(FILENAME) as data:
                chunksize, extra = divmod(nlines(FILENAME), len(pool._pool) * 4) #type: ignore
                if extra:
                    chunksize += 1
                print(f'{chunksize=}')
                for _ in pool.imap(process, data, chunksize=chunksize):
                    ...
    
    if __name__ == '__main__':
        main()