Search code examples
pythonnumpypython-multiprocessing

Multi-processing filling of sub-arrays to build a global array by merging all sub-arrays of each process


In Python 2.7, I have to build a 2D array (arrayFullCross_final[u][u]) which contains 16 blocks with size of each block of 100x100 elements. At the beginning, I use a 4D array (arrayFullCross), then I reshape it into 400x400 2D array.

I have a first version (sequential) where I am using the classical python function "map" and a "generator" like this (buildCrossMatrix_loopis the function that I want to apply the generator generatorCrossMatrix) :

# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))


def buildCrossMatrix_loop(params_array):

  # rows indices
  xb = params_array[0]
  # columns indices
  yb = params_array[1]
  # Current redshift
  z = zrange[params_array[2]]

  # Loop inside block
  for ub in range(dimPoints):
    for vb in range(dimPoints):
      # Diagonal terms
      if (xb == yb):
        #arrayFullCross[u][u][w][t] = 2*P_bgs**2 * N_bgs**2
        if (xb == 0):
          N_bgs = (1+1/(n[params_array[2]]*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)))

          arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)**2 * N_bgs**2
...
...

##### MAIN LOOP to fill, at each index i, the array "arrayFullCross" #####
while i < len(zrange):

  ...
  ...

  def generatorCrossMatrix(index):
    for igen in range(dimBlocks):
      for lgen in range(dimBlocks):
        yield igen, lgen, index


  if __name__ == '__main__':
      map(buildCrossMatrix_loop, generatorCrossMatrix(i))
  
  ...      
  ...

i = i+1   

i is just an index into the main loop "while".

With this sequential method, everything works fine and I get the expected big output array arrayFullCross[u][v][x][y] (I checked the values in it and after reshape by 400x400, it's good).

Now, I tried to do the same things but with multiprocessing import Pool. I did :

from multiprocessing import Pool

def buildCrossMatrix_loop(params_array):
...

while i < len(zrange):
...

if __name__ == '__main__':          
      pool = mp.Pool(16)
      pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
      pool.terminate()

      # Reshape 4D array to 2D global array
      arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)

      print 'arrayFullCross2D_final = ', arrayFullCross2D_final

But when I print the final output 2D array arrayFullCross2D_final, I get systematically an array filled with only zero values.

arrayFullCross2D_final =  [[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
 ...
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]]

Maybe I have to make shared the 4D array arrayFullCross between the different processes ? How could I perform this ?

How can each process can modify concurrently different parts of the 4D array ?

It seems like this 4D global array is overwritten for each i index of the loop.

Update 1

I forgot to say that I have declared the full array like this (at the beginning of the main, i.e outside the while loop) :

# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))

How can I use the solution with the answer given and my declaration of arrayFullCross? i.e :

manager = Manager()
arrayFullCross = manager.list()

Update 2

I though that I have found a good solution by using ThreadPool with from multiprocessing.dummy import Pool as ThreadPool, in this way:

pool = ThreadPool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.close()
pool.join()

But the performances seem to be bad : Indeed, I only see one only process running with top or htop commands, it it normal?

It seems like time is mostly spent by locking the global array to write in it: this is not necessary the case since I fill the global array on independent sub-arrays.

Could I use ThreadPool for this?


Solution

  • The code seems to indeed be correct. However, when you run it in pool mode, each worker will have their own copy of the arrays. They will then write back to the shared memory copy which you never touch, hence the table filled with 0.

    By utilizing shared memory variables the in the multiprocessing module you should be able to share the results with the main thread. You could use a c-type array, but that would greatly increase your code's complexity. The multiprocessing modules provides python-like lists through the Manager submodule. It should be enough to make arrayFullCross a Manager list:

    from multiprocessing import Manager, Pool
    manager = Manager()
    arrayFullCross = manager.list()
    
    def buildCrossMatrix_loop(params_array):
    ...
    
    while i < len(zrange):
    ...
    
    if __name__ == '__main__':          
          pool = mp.Pool(16)
          pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
          pool.terminate()
    
          # Reshape 4D array to 2D global array
          arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)
    
          print 'arrayFullCross2D_final = ', arrayFullCross2D_final
    

    It is noteworthy that utilizing a manager object creates a certain level of overhead. If the performance is not to your satisfaction, try using the Array type from multiprocessing.

    More on these resources can be found in the multiprocessing docs.