Search code examples
pythonparallel-processingmultiprocessingpython-multiprocessing

Multiprocessing: How to mp.map a function storing elements in a list?


I have a program similar to the following:

import time
from multiprocessing import Pool

class a_system():
    def __init__(self,N):
        self.N = N
        self.L = [0 for _ in range(self.N)]
    def comp(self,n):
        self.L[n] = 1
        return self.L
    def reset(self):
        self.L = [0 for _ in range(self.N)]

def individual_sim(iter):
    global B, L, sys
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

def simulate(N_mc):
    global B, L, sys
    L = [[] for _ in range(N_mc)]
    B = 0
    sys = a_system(N_mc)
    [*map(individual_sim, range(N_mc))]
    # with Pool() as P:
    #     P.map(individual_sim,range(N_mc))
    return L, B

if __name__=="__main__":
    start = time.time()
    L, B = simulate(N_mc=5)
    print(L)
    print(B)
    print("Time elapsed: ",time.time()-start)

Here I would like to parallelise the line [*map(individual_sim, range(N_mc))] with multiprocessing. However, replacing this line with

with Pool() as P:
     P.map(individual_sim,range(N_mc))

returns an empty list of lists.

If instead I use P.map_async, P.imap, or P.imap_unordered, I don't get an error, but the list and B are left empty.

How can I parallelise this code?

P.S. I have tried ThreadPool from multiprocessing.pool, but I would like to avoid that, because the class a_system, which is a bit more complicated that the one shown here, needs to have a different copy for each worker (I get an exit code 139 (interrupted by signal 11: SIGSEGV)).

P.S.2 I might try to use sharedctypes or Managers (?), but I'm not sure how they work, nor which one should I use (or a combination?).

P.S.3 I have also tried modifying individual_sim as

def individual_sim(iter,B,L,sys):
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

and to use the following in simulation:

   from functools import partial
   part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
   with Pool() as P:
        P.map(part_individual_sim,range(N_mc))

But I still get empty lists.


Solution

  • It's not really clear to me what your business logic is here, but you cannot modify globals in your parent from within your child processes. Separate processes don't share their address space.

    You could make L a Manager.List and B a Manager.Value to modify them from your worker processes, though. Manager-objects live in a separate server process and you can modify them with proxy objects. Further you would need to use a Manager.Lock while modifying these shared objects to prevent data corruption.

    Here is a stripped-down example which should get you started:

    import time
    from multiprocessing import Pool, Manager
    
    
    def individual_sim(mlist, mvalue, mlock, idx):
        # in your real computation, make sure to not hold the lock longer than
        # really needed (e.g. calculations without holding lock)
        with mlock:
            mlist[idx] += 10
            mvalue.value += sum(mlist)
    
    
    def simulate(n_workers, n):
    
        with Manager() as m:
            mlist = m.list([i for i in range(n)])
            print(mlist)
            mvalue = m.Value('i', 0)
            mlock = m.Lock()
    
            iterable = [(mlist, mvalue, mlock, i) for i in range(n)]
    
            with Pool(processes=n_workers) as pool:
                 pool.starmap(individual_sim, iterable)
    
            # convert to non-shared objects before terminating manager
            mlist = list(mlist)
            mvalue = mvalue.value
    
        return mlist, mvalue
    
    
    if __name__=="__main__":
    
        N_WORKERS = 4
        N = 20
    
        start = time.perf_counter()
        L, B = simulate(N_WORKERS, N)
        print(L)
        print(B)
        print("Time elapsed: ",time.perf_counter() - start)
    

    Example Output:

    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    5900
    Time elapsed:  0.14064819699706277
    
    Process finished with exit code 0
    

    It would also be possible to use Pool's initializer-parameter to pass proxies upon worker initialization and register them as globals instead of sending them as regular arguments with the starmap-call.

    A bit more about Manager usage (relevant: nested proxies) I've written up here.