Search code examples
pythonlinuxgeneratorlarge-filesconcurrent.futures

Can ProcessPoolExecutor work with yield generator in Python?


I have a python script aiming to process some a large file and write the results in a new txt file. I simplified it as Code example 1. Code example 1:

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os

def process(args):
    parm1, parm2, parm3, output_file = args
    scores = []
    # do something with scores...
    if parm1:
        scores = [parm2, sum(parm3)]
    else:
        scores = [parm2/len(parm3), sum(parm3)/len(parm3)]
    # create result
    result = '\t'.join(scores) + '\n'
    output_file.write(result)

def main(large_file_path, output_path, max_processes):
    #do something ...
    with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
        arg_list = []
        parm1, parm2, parm3 = False, 0, []
        for line in large_file:
            if line.startswith('#'):
                continue
            else:
                # do something to update parm1, parm2, parm3...
                data = line.strip().split('\t')
                parm1 = data[0] > 0
                parm2 = data[1]
                parm3 = data[2:]
                # add to list
                arg_list.append((parm1, parm2, parm3, output_file))
        with ProcessPoolExecutor(max_processes) as executor:
            executor.map(process, arg_list, chunksize=int(max_processes/2))

if __name__ == "__main__":
    large_path = "/path/to/large_file"
    output_path = f"para_scores.txt"
    max_processes = int(os.cpu_count()/2)# Set the maximum number of processes

    main(large_path, output_path, max_processes)

I realized that arg_list might be quite large if the large_file is very large. I am not sure if there is enough free memory for it. Then I tried to use yield generator instead of just a python list as Code example 2, which runs normally but does not generate anything. Code example 2:

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os

def process(args):
    parm1, parm2, parm3, output_file = args
    scores = []
    # do something with scores...
    if parm1:
        scores = [parm2, sum(parm3)]
    else:
        scores = [parm2/len(parm3), sum(parm3)/len(parm3)]
    # create result
    result = '\t'.join(scores) + '\n'
    output_file.write(result)

def main(large_file_path, output_path, max_processes):
    #do something ...
    with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
        def arg_generator(large_file, output_file):
            parm1, parm2, parm3 = False, 0, []
            for line in large_file:
                if line.startswith('#'):
                    continue
                else:
                    # do something to update parm1, parm2, parm3...
                    data = line.strip().split('\t')
                    parm1 = data[0] > 0
                    parm2 = data[1]
                    parm3 = data[2:]
                    # yield
                    yield (parm1, parm2, parm3, output_file)
        with ProcessPoolExecutor(max_processes) as executor:
            executor.map(process, arg_generator(large_file, output_file), chunksize=int(max_processes/2))

if __name__ == "__main__":
    large_path = "/path/to/large_file"
    output_path = f"para_scores.txt"
    max_processes = int(os.cpu_count()/2)# Set the maximum number of processes

    main(large_path, output_path, max_processes)

I ran the code on a ubuntu 20.04.6 LTS server, python 3.9.18.

So can ProcessPoolExecutor work with yield generator in Python? Or the use of executor.map is problemmatic? What should I do to make it work?


Solution

  • Yes it can. Here's an example:

    from concurrent.futures import ProcessPoolExecutor
    
    
    def process(x):
        print(x * 2)
    
    
    def gen():
        for i in range(3):
            yield i
    
    
    def main():
        print('starting')
        with ProcessPoolExecutor() as executor:
            executor.map(process, gen())
        print('done')
    
    
    main()
    

    Output:

    starting
    0
    2
    4
    done
    

    The processes don't know where their arguments came from, they recieve individual items, not the entire generator. Using a generator makes no difference.

    However, trying to pass an open file handle does seem to weirdly break things:

    from concurrent.futures import ProcessPoolExecutor
    
    
    def process(f):
        print('processing', f)
    
    
    def main():
        print('starting')
        with open('path_to_file.txt') as f:
            with ProcessPoolExecutor() as executor:
                executor.map(process, [f])
        print('done')
    
    
    main()
    

    The output is missing the 'processing':

    starting
    done
    

    Rather pass the filename as a string to each process. Use a different filename for each process so that they don't overwrite each other.