Search code examples
pythonfor-looppython-itertoolsmultiprocess

Mismatch between parallelized and linear nested for loops


I want to parallelize a piece of code that resembles the following:

 Ngal=10
 sampind=[7,16,22,31,45]
 samples=0.3*np.ones((60,Ngal))
 zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
 toavg=[]
 for j in range(Ngal):
     gal=[] 
     for m in sampind:
          gal.append(samples[m][j]-zt[j])
     toavg.append(np.mean(gal))
 accuracy=np.mean(toavg)

so I followed the advice here and I rewrote it as follows:

toavg=[]
gal=[]
p = mp.Pool()

def deltaz(params):
    j=params[0] # index of the galaxy
    m=params[1] # indices for which we have sampled redshifts
    gal.append(samples[m][j]-zt[j])
    return np.mean(gal)

j=(np.linspace(0,Ngal-1,Ngal).astype(int))
m=sampind
grid=[j,m]
input=itertools.product(*grid)
results = p.map(deltaz,input)
accuracy=np.mean(results)
p.close()
p.join()

but the results are not the same. In fact, sometimes they are, sometimes they're not. It doesn't seem very deterministic. Is my approach correct? If not, what should I fix? Thank you! The modules that you will need to reproduce the above examples are:

import numpy as np
import multiprocess as mp
import itertools

Thank you!


Solution

  • The first issue I see is that you are creating a global variable gal which is being accessed by the function deltaz. These are however not shared between the pool processes but instantiated for each process separately. You will have to use shared memory if you want them to share this structure. This is probably why you see a non-deterministic behavior.

    The next issue is that you are not actually completing the same tasking with the different variation. The first one you are taking an average of each set of averages (gal). The parallel one is taking an average of which ever elements happen to end up in that list. This is nondeterministic because items are assigned to processes as they become available and this is not necessarily predictable.

    I would suggest parallelizing the inner loop. To do this, you need zt and samples to both be in shared memory because they are accessed by all of the processes. This can get dangerous if you are modifying data but since you appear to only be reading it should be fine.

    import numpy as np
    import multiprocessing as mp
    import itertools
    import ctypes
    #Non-parallel code
    Ngal=10
    sampind=[7,16,22,31,45]
    samples=0.3*np.ones((60,Ngal))
    zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
    #Nonparallel
    toavg=[]
    for j in range(Ngal):
        gal=[]
        for m in sampind:
             gal.append(samples[m][j]-zt[j])
        toavg.append(np.mean(gal))
    accuracy=np.mean(toavg)
    print(toavg)
    
    # Parallel function
    def deltaz(j):
        sampind=[7,16,22,31,45]
        gal = []
        for m in sampind:
             gal.append(samples[m][j]-zt[j])
        return np.mean(gal)
    # Shared array for zt
    zt_base = mp.Array(ctypes.c_double, int(len(zt)),lock=False)
    ztArr = np.ctypeslib.as_array(zt_base)
    #Shared array for samples
    sample_base = mp.Array(ctypes.c_double, int(np.product(samples.shape)),lock=False)
    sampArr = np.ctypeslib.as_array(sample_base)
    sampArr = sampArr.reshape(samples.shape)
    #Copy arrays to shared
    sampArr[:,:] = samples[:,:]
    ztArr[:] = zt[:]
    with mp.Pool() as p:
        result = p.map(deltaz,(np.linspace(0,Ngal-1,Ngal).astype(int)))
        print(result)
    

    Here is an example that produces the same results. You can add more complexity to this as you see fit but I would read about multiprocessing in general and memory types/scopes to get an idea of what will and won't work. You have to take more care when you get into the multiprocessing world. Let me know if this doesn't help and I will try to update it so that it does.