Search code examples
pythonparallel-processingmpiopenmpimpi4py

ValueError in MPI scatter method while using 2D list in Python-mpi4py


I have a csv file that I read it into a 2D list and I want to use scatter method in MPI (mpi4py) to send different chunk of this list into different processing elements to process them as follows:

df = []
with open("data_tiny.csv") as csv_file:
   csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
   df.append(row)

recvbuf = [[""] * (len(df[0])) for _ in range(math.ceil(len(df)//size))]  
recvbuf= comm.scatter(df, root=0)
print('Rank: ',rank, ', recvbuf received: ',recvbuf)
for t in recvbuf[:]:
  if t[7] != 'o3':
    recvbuf.remove(t)
comm.gather(recvbuf, df, root=0)
if rank == 0:
   print('Rank: ',rank, ', recvbuf received: ',df)

and I get the following error:

Traceback (most recent call last):
File "MPI_1.py", line 21, in <module>
   recvbuf= comm.scatter(df, root=0)
File "mpi4py/MPI/Comm.pyx", line 1267, in mpi4py.MPI.Comm.scatter
File "mpi4py/MPI/msgpickle.pxi", line 730, in mpi4py.MPI.PyMPI_scatter
File "mpi4py/MPI/msgpickle.pxi", line 119, in mpi4py.MPI.Pickle.dumpv
ValueError: expecting 4 items, got 54

the error says scatter expects 4 items, got 54 (the length of df (2D array) is 54 that's why it says the scatter got 54). My question is how can i pass a 2d list to the scatter method (not through using numpy) and resolve the error here.

the input data is a 9 columns and 54 rows of data such as:

 a,  aa, aaa, aaaa, aaaaa, aaaaaa, ab, abb, abbb
 a1,  aa1, aaa1, aaaa1, aaaaa1, aaaaaa1, ab1, abb1, abbb1
 a2,  aa2, aaa2, aaaa2, aaaaa2, aaaaaa2, ab2, abb2, abbb2
 a3,  aa3, aaa3, aaaa3, aaaaa3, aaaaaa3, ab3, abb3, abbb3
 .....
 .....

Solution

  • ValueError: expecting 4 items, got 54

    This happens because the scatter routine:

    recvbuf= comm.scatter(df, root=0)
    

    expects that df has the same length as the number of processes running (i.e., comm.size).

    Since you are running with 4 processes and df has 54 elements you get the error.

    > ValueError: expecting 4 items, got 54
    

    To solve this you need to pack df so that it contains as many element as the number of processes, where each element can be an array with the elements to be send to the given process.

    For example, let us say that you are running with 4 processes and df=[1,2,3,4,5,6,7,8] you would need to make df=[[1,2][3,4][5,6][7,8]]. Where df[0] will go to the process 0, df[1] to process 1 and so on.

    An example of a possible solution:

    import csv
    import math
    from mpi4py import MPI
    
    def split(a, n):
        k, m = divmod(len(a), n)
        return list(a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
    
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
    
    df = []
    if rank == 0:
        with open("data_tiny.csv") as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=',')
            for row in csv_reader:
                df.append(row)
        df = split(df, size)
    
    recvbuf = comm.scatter(df, root=0)
    print(recvbuf)