Search code examples
pythonparallel-processingstoragepython-multiprocessing

Parallelize and separate computation from storage in python


I am trying to parallelize the computation and storage of intermediate results. The task can be described as follows:

Given a large set of tasks to compute, take a chunk of tasks and parallelize some kind of computation across the available CPU/GPU. The output is relatively large so that it doesn't fit in memory over all chunks. So once one chunk computation is done write the collected results from the processes to a single result file. The real storage mechanism is a bit more complicated and cannot be easily moved to the individual jobs. I really need to collect them and collectively store them.

The storage part takes quite some time and I don't know how to decouple the two things. Ideally, the workflow would be: compute -> collect -> store / while storing start computing already -> compute/store etc.

Here is some dummy code that only features the parallel computation but not the computation / storing separation. What is the framework concept I need to implement to make this nicer / faster?

import numpy as np
from multiprocessing import Pool
import time

def crunch(n):
    print(f"crunch dummy things for input: {n}")
    results = np.random.random(100)
    time.sleep(np.random.randint(0, 3))
    return results

def store(results_npz, index):
    print(f"storing iteration {index}")
    np.savetxt(f'test_{str(index).zfill(2)}.out', results_npz)

# all tasks
all_tasks = list(range(10))

# iterate over tasks in chunks
for i in range(5):
    print(f"start iteration {i}")
    input_chunk = [all_tasks.pop(0), all_tasks.pop(0)]
    with Pool(2) as mp:
        results = mp.map(crunch, input_chunk)

    print("storing results ...")
    # ideally, this should start and then the result computation can start again
    results_all = np.vstack(results)
    store(results, i)

Edit: Important information! There can only be one process running that stores the results.


Solution

  • You have multiple requirements that all have to be satisfied. The first one is:

    There can only be one process running that stores the results.

    That suggests a Process dedicated to the storage activity. A queue can be used for the individual data-crunch Processes to send data to the storage Process.

    You also want to crunch the data in chunks, then combine each item, in order, with the np.vstack function. The file names will also be incremented in order. I hope I understood this requirement correctly.

    One solution is to use multiple queues to keep the data in order. You have chosen a chunk size of 2.

    Also you don't want to hold all of the crunched data in memory at the same time. The data crunching process can be regulated by fixing the queue size. In my code I have set the queue size at 1, which will allow the crunching to run ahead of the storage by only one chunk. No more than 3 chunks will ever be in memory at once - the one being written, one on the queue, and one being crunched. Here is a working script using your original test code to simulate the data:

    import multiprocessing as mp
    from concurrent.futures import ProcessPoolExecutor
    import time
    from typing import List
    
    import numpy as np
    
    def crunch(q: mp.Queue, task_data):
        time.sleep(np.random.randint(0, 3))
        x = np.random.random(100)
        x[0] = task_data   # Write the index to the file as a diagnostic
        q.put(x)
        print(f"crunched dummy things for input: {task_data}")
    
    def store(queues: "mp.list"):
        n = 0  # count chunks of data
        while True:
            # Collect one cycle of data
            x = [q.get() for q in queues]
            if x[0] is None:
                break
            print(f"storing iteration {n}")
            print("Tasks", [a[0] for a in x])
            results_all = np.vstack(x)
            np.savetxt(f'test_{str(n).zfill(2)}.out', results_all)
            time.sleep(1.5)
            print(f"stored iteration {n}")
            n += 1
                
    CHUNKSIZE = 2
    
    def main():
        all_tasks: List["task_data"] = list(range(10))
        with mp.Manager() as mgr:
            # Create a list of queues that is sharable among processes
            queues = mgr.list()
            for _ in range(CHUNKSIZE):
                queues.append(mgr.Queue(1))
            with ProcessPoolExecutor(CHUNKSIZE+1) as pexec:  
                # Launch the storage process
                fut = pexec.submit(store, queues)
                # iterate over tasks in chunks
                chunks = len(all_tasks) // CHUNKSIZE
                for i in range(chunks):
                    print(f"start: iteration {i}")
                    x = all_tasks[i * CHUNKSIZE:(i + 1) * CHUNKSIZE]
                    futures = []
                    for q, data in zip(queues, x):
                        futures.append(pexec.submit(crunch, q, data))
                    print(f"wait for finish: iteration {i}")
                    for future in futures:
                        future.result()
                # Shutdown
                for q in queues:
                    q.put(None)
                # Wait for final shutdown
                print("Final wait")
                fut.result()
        print("All finished")
            
    if __name__ == "__main__":
        main()
    

    The output is something like this (it's not always the same because of the random time sleeps):

    start: iteration 0
    wait for finish: iteration 0
    crunched dummy things for input: 1
    crunched dummy things for input: 0
    start: iteration 1
    storing iteration 0
    Tasks [0.0, 1.0]
    wait for finish: iteration 1
    crunched dummy things for input: 2
    stored iteration 0
    crunched dummy things for input: 3
    start: iteration 2
    storing iteration 1
    Tasks [2.0, 3.0]
    wait for finish: iteration 2
    crunched dummy things for input: 4
    stored iteration 1
    crunched dummy things for input: 5
    start: iteration 3
    storing iteration 2
    Tasks [4.0, 5.0]
    wait for finish: iteration 3
    crunched dummy things for input: 6
    crunched dummy things for input: 7
    start: iteration 4
    wait for finish: iteration 4
    stored iteration 2
    crunched dummy things for input: 8
    storing iteration 3
    Tasks [6.0, 7.0]
    crunched dummy things for input: 9
    stored iteration 3
    Final wait
    storing iteration 4
    Tasks [8.0, 9.0]
    stored iteration 4
    All finished