Search code examples
pythonpython-multiprocessing

Pickle/unpickle only once per worker


I am using python multiprocessing module to spread say 10000 steps of a given task on 4 workers using a Pool. The task that is sent on the workers is a method of a complex object. If I understand right the documentation, pickle is used to dump and load the object at each step which means 10000 pickling/unpickling calls. My problem is that the object that is pickled is quite complex (it contains a lot of aggregations of nested complex objects) and the picking/unpickling process take some times. Hence, my job is very much slower when it is run using multiprocessor regarding a monoprocessor call. My question is the following: is there a way to do the pickle/unpickle process only once per worker instead of once per step ?

EDIT: The code I try to parallelize has the following (simplified) structure:

import time
import multiprocessing

class Analysis:

    def run_step(self):

        print('run_step')

    def __getstate__(self):
        print('I dump')
        return self.__dict__

    def __setstate__(self,state):
        print('I load')
        self.__dict__ = state

a = Analysis()

pool = multiprocessing.Pool(4)

for i in range(10):
    pool.apply_async(a.run_step)
pool.close()
pool.join()

Solution

  • How about using Processes instead?

    If such a structure is feasible for your use case, you can create another function for workers which run any target function you require. Then start the worker functions using multiprocessing.Process like below:

    import math
    import multiprocessing
    
    class Analysis:
    
        def run_step(self):
    
            print('run_step')
    
        def __getstate__(self):
            print('I dump')
            return self.__dict__
    
        def __setstate__(self,state):
            print('I load')
            self.__dict__ = state
    
    def worker(target, num):
        for _ in range(num):
            target()
    
    if __name__ == "__main__":
        a = Analysis()
        proc = []
        proc_num = 4
        runs = 10
        per_proc_run = math.ceil(runs/proc_num)  # A little inaccurate but I am sure you can figure something out :)
    
        for _ in range(proc_num):
            proc.append(multiprocessing.Process(target=worker, args=(a.run_step, per_proc_run)))
            proc[-1].start()
    
        for process in proc:
            process.join()
    

    Output:

    I dump
    I dump
    I dump
    I dump
    I load
    run_step
    run_step
    run_step
    I load
    run_step
    run_step
    run_step
    I load
    run_step
    run_step
    run_step
    I load
    run_step
    run_step
    run_step
    

    Pickles/Unpickles only once per worker. You could probably replicate the same thing in pools but I find this more straightforward.