Search code examples
python-3.xmpi4pynumpy-slicing

How to scatter/send all possible column pairs to the child processes and find coherence between the columns using python mpi4py? Parallel computation


I've a big matrix/2D array for which every possible column-pair I need to find the coherence by parallel computation in python (e.g. mpi4py). Coherence [a function] are computed at various child processes and the child process should send the coherence value to the parent process that gather the coherence value as a list. To do this, I've created a small matrix and list of all possible column pairs as follows:

import numpy as np
from scipy import signal
from itertools import combinations
from mpi4py import MPI


comm = MPI.COMM_WORLD
nproc = comm.Get_size()
rank = comm.Get_rank()

data=np.arange(20).reshape(5, 4)
#List of all possible column pairs
data_col = list(combinations(np.transpose(data), 2)) #list

# Function creation
def myFunc(X,Y):
..................
..................
    return Real_coh

if rank==0:
    Data= comm.scatter(data_col,root=0) #col_pair

Can anyone suggest me how to proceed further. You are welcome to ask any questions/clarifications. Expecting your cordial help. Thanks


Solution

  • check out the following scripts [with comm.Barrier for sync. communication]. In the script, I've written and read the files as a chunk of h5py dataset which is memory efficient.

    import numpy as np
    from scipy import signal
    from mpi4py import MPI
    import h5py as t
    
    chunk_len = 5000 # No. of rows of a matrix
    num_c = 34    # No. of column of the matrix
    
    # Actual Dataset
    data_mat = np.random.random((10000, num_c))
    
    shape = (chunk_len, data_mat.shape[1])
    chunk_size = (chunk_len, 1)
    no_of_chunks = data_mat.shape[1]
    
    with t.File('file_name.h5', 'w') as hf:
        hf.create_dataset("chunked_arr",  data=data_mat, chunks=chunk_size, compression='lzf')
    del data_mat
    
    def myFunc(dset_X, dset_Y):
        ..............
        ............
        return Real_coh
    
    res = np.zeros((num_c, num_c))
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
    
    for i in range(num_c):
        with t.File('file_name.h5', 'r', libver='latest') as hf:
            dset_X = hf['chunked_arr'][:, i]  # Chunk data reading
        if i % size == rank:
            for j in range(num_c):
                with t.File('file_name.h5', 'r', libver='latest') as hf:
                    dset_Y = hf['chunked_arr'][:, j] # Chunk data reading
                res[i][j] = spac(dset_X, dset_Y)
    comm.Barrier()
    print('Shape of final result :', res.shape )