Search code examples
pythonparallel-processingmultiprocessinglarge-data

Pass around large amounts of data with multiprocessing


I am trying to figure out how to write a program that performs computations in parallel such that the result of each computation can be written to a file in a specific order. My problem is size; I would like to do what I've outlined in the sample program below - save the large output as the value of a dictionary which stores the ordering system in its keys. But my program keeps breaking because it can't store/pass around so many bytes.

Is there a set way to approach such problems? I'm new to dealing with both multiprocessing and large data.

from multiprocessing import Process, Manager

def eachProcess(i, d):
    LARGE_BINARY_OBJECT = #perform some computation resulting in millions of bytes
    d[i] = LARGE_BINARY_OBJECT
def main():
    manager = Manager()
    d = manager.dict()
    maxProcesses = 10
    for i in range(maxProcesses):
        process = Process(target=eachProcess, args=(i,d))
        process.start()

    counter = 0
    while counter < maxProcesses:
        file1 = open("test.txt", "wb")
        if counter in d:
            file1.write(d[counter])
            counter += 1

if __name__ == '__main__':
    main()

Thank you.


Solution

  • When dealing with large data usually the approaches are two:

    1. Local file system if the problem is simple enough
    2. Remote data storage if more complex support over data is needed

    As your problem seems pretty simple, I'd suggest the following solution. Each process writes its partial solution to a local file. Once all processing is done, the main process combines all result files together.

    from multiprocessing import Pool
    from tempfile import NamedTemporaryFile
    
    def worker_function(partial_result_path):
        data = produce_large_binary()
        with open(partial_result_path, 'wb') as partial_result_file:
            partial_result_file.write(data)
    
    # storing partial results in temporary files
    partial_result_paths = [NamedTemporaryFile() for i in range(max_processes)]
    
    pool = Pool(max_processes)
    pool.map(worker_function, partial_result_paths)
    
    with open('test.txt', 'wb') as result_file:
        for partial_result_path in partial_result_paths:
            with open(partial_result_path) as partial_result_file:
                result_file.write(partial_result_file.read())