Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

What is the safest method to save files generated by different processes with multiprocessing in Python?


I am totally new to using the multiprocessing package. I have built an agent-based model and would like to run a large number of simulations with different parameters in parallel. My model takes an xml file, extracts some parameters and runs a simulation, then generates two pandas dataframes and saves them as pickle files. I'm trying to use the multiprocessing.Process() class, but the two dataframes are not saved correctly, rather for some simulation I get a single dataframe for others no dataframe. Am I using the right class for this type of work? What is the safest method to write my simulation results to disk using the multiprocessing module? I add, If I launch the simulations sequentially with a simple loop I get the right outputs. Thanks for the support

I add an example of code that is not reproducible because I don't have the possibility to share the model, composed by many modules and xml files.

import time
import multiprocessing
from model import ProtonOC
import random
import os
import numpy as np
import sys
sys.setrecursionlimit(100000)

def load(dir):
    result = list()
    names = list()
    for filename in os.scandir(dir):
        if filename.path.endswith('.xml'):
            result.append(filename.path)
            names.append(filename.name[:-4])
    return result, names

def run(xml, name):
    model = MYMODEL()
    model.override_xml(xml)
    model.run()
    new_dir = os.path.join("C:\\", name)
    os.mkdir(new_dir)
    model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents" + ".pkl"))
    model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model" + ".pkl"))

if __name__ == '__main__':
    paths, names = load("C:\\") #The folder that contains xml files
    processes = []
    for path, name in zip(paths, names):
        p = multiprocessing.Process(target=run, args=(path, name))
        processes.append(p)
        p.start()

    for process in processes:
        process.join()

Solution

  • I can elaborate on my comment, but alas, looking at your code and not knowing anything about your model, I do not see an obvious cause for the problems you mentioned.

    I mentioned in my comment that I would use either a thread pool or processor pool according to whether your processing was I/O bound or CPU bound in order to better control the number of threads/processes you create. And while threads have less overhead to create, the Python interpreter would be executed within the same process and there is thus no parallelism when executing Python bytecode due to the Global Interpreter Lock (GIL) having to first be obtained. So it is for that reason that processor pools are generally recommended for CPU-intensive jobs. However, when execution is occurring in runtime libraries implemented in the C language, such as often the case with numpy and pandas, the Python interpreter releases the GIL and you can still have a high degree of parallelism with threads. But I don't know what the nature processing being done by the ProtonOC class instance. Some if it is clearly I/O related. So for now I will recommend that you initially try a thread pool for which I have arbitrarily set a maximum size of 20 (a number I pulled out of thin air). The issue here is that you are doing concurrent operations to your disk and I don't know whether too many threads will slow down disk operations (do you have a solid-state drive where arm movement is not an issue?)

    If you run the following code example with MAX_CONCURRENCY set to 1, presumably it should work. Of course, that is not your end goal. But it demonstrates how easily you can set the concurrency.

    import time
    from concurrent.futures import ThreadPoolExecutor as Executor
    from model import ProtonOC
    import random
    import os
    import numpy as np
    import sys
    sys.setrecursionlimit(100000)
    
    def load(dir):
        result = list()
        names = list()
        for filename in os.scandir(dir):
            if filename.path.endswith('.xml'):
                result.append(filename.path)
                names.append(filename.name[:-4])
        return result, names
    
    def run(xml, name):
        model = ProtonOC()
        model.override_xml(xml)
        model.run()
        new_dir = os.path.join("C:\\", name)
        os.mkdir(new_dir)
        model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents.pkl"))
        model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model.pkl"))
    
    if __name__ == '__main__':
        paths, names = load("C:\\") #The folder that contains xml files
        MAX_CONCURRENCY = 20 # never more than this
        N_WORKERS = min(len(paths), MAX_CONCURRENCY)
        with Executor(max_workers=N_WORKERS) as executor:
            executor.map(run, paths, names)
    

    To use a process pool, change:

    from concurrent.futures import ThreadPoolExecutor as Executor
    

    to:

    from concurrent.futures import ProcessPoolExecutor as Executor
    

    You may then wish to change MAX_CONCURRENCY. But because the jobs still involve a lot of I/O and give up the processor when doing this I/O, you might benefit from this value being greater than the number of CPUs you have.

    Update

    An alternative to using the map method of the ThreadPoolExecutor class is to use submit. This gives you an opportunity to handle any exception on an individual job-submission basis:

    if __name__ == '__main__':
        paths, names = load("C:\\") #The folder that contains xml files
        MAX_CONCURRENCY = 20 # never more than this
        N_WORKERS = min(len(paths), MAX_CONCURRENCY)
        with Executor(max_workers=N_WORKERS) as executor:
            futures = [executor.submit(run, path, name) for path, name in zip(paths, names)]
            for future in futures:
                try:
                    result = future.get() # return value from run, which is None
                except Exception as e: # any exception run might have thrown
                    print(e) # handle this as you see fit
    

    You should be aware that this submits jobs one by one whereas map, when used with the ProcessPoolExecutor, allows you to specify a chunksize parameter. When you have a pool size of N and M jobs to submit where M is much greater than N, it is more efficient to place on the work queue for each process in the pool chunksize jobs at a time rather than one at a time to reduce the number of shared memory transfers required. But as long as you are using a thread pool, this is not relevant.