Search code examples
numpympi4py

Odd-size numpy arrays send/receive


I would like to gather numpy array contents from all processors to one. In case all arrays are of the same size, it works. However I don't see a natural way of doing the same task for arrays of proc-dependent size. Please consider the following code:

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

if rank >= size/2:
    nb_elts = 5
else:
    nb_elts = 2

# create data
lst = []
for i in xrange(nb_elts):
    lst.append(rank*3+i)
array_lst = numpy.array(lst, dtype=int)

# communicate array
result = []
if rank == 0:
    result = array_lst
    for p in xrange(1, size):
        received = numpy.empty(nb_elts, dtype=numpy.int)
        comm.Recv(received, p, tag=13)
        result = numpy.concatenate([result, received])
else:
    comm.Send(array_lst, 0, tag=13)

My problem is at the "received" allocation. How can I know what is the size to be allocated? Do I have to first send/receive each array size?

Based on a suggestion below, I'll go with

data_array = numpy.ones(rank + 3, dtype=int)
data_array *= rank + 5
print '[{}] data: {} ({})'.format(rank, data_array, type(data_array))

# make all processors aware of data array sizes
all_sizes = {rank: data_array.size}
gathered_all_sizes = comm_py.allgather(all_sizes)
for d in gathered_all_sizes:
    all_sizes.update(d)

# prepare Gatherv as described by @francis
nbsum = 0
sendcounts = []
displacements = []
for p in xrange(size):
    n = all_sizes[p]
    displacements.append(nbsum)
    sendcounts.append(n)
    nbsum += n

if rank==0:
    result = numpy.empty(nbsum, dtype=numpy.int)
else:
    result = None

comm_py.Gatherv(data_array,[result, tuple(sendcounts), tuple(displacements), MPI.INT64_T], root=0)

print '[{}] gathered data: {}'.format(rank, result)

Solution

  • In the code you pasted, both Send() and Recv() sends nb_elts elements. The problem is that nb_elts is not the same for every processes... Hence, the number of item received does not match the number of elements that were sent and the program complains:

    mpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated

    To prevent that, the root process must compute the number of items that the other processes have sent. Hence, in the loop for p in xrange(1, size), nb_elts must be computed according to p, not rank.

    The following code based on yours has been corrected. I would add that the natural way to perform this gathering operation is to use Gatherv(). See http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html and the documentation of mpi4py for instance. I added the corresponding sample code. The only tricky point is that numpy.int is 64bit long. Hence, the Gatherv() uses the MPI type MPI_DOUBLE.

    from mpi4py import MPI
    import numpy
    
    comm = MPI.COMM_WORLD
    rank = comm.rank
    size = comm.size
    
    if rank >= size/2:
        nb_elts = 5
    else:
        nb_elts = 2
    
    # create data
    lst = []
    for i in xrange(nb_elts):
        lst.append(rank*3+i)
    array_lst = numpy.array(lst, dtype=int)
    
    # communicate array
    result = []
    if rank == 0:
        result = array_lst
        for p in xrange(1, size):
    
            if p >= size/2:
                 nb_elts = 5
            else:
                 nb_elts = 2
    
            received = numpy.empty(nb_elts, dtype=numpy.int)
            comm.Recv(received, p, tag=13)
            result = numpy.concatenate([result, received])
    else:
        comm.Send(array_lst, 0, tag=13)
    
    if rank==0:
        print "Send Recv, result= "+str(result)
    
    #How to use Gatherv:
    nbsum=0
    sendcounts=[]
    displacements=[]
    
    for p in xrange(0,size):
        displacements.append(nbsum)
        if p >= size/2:
                 nbsum+= 5
                 sendcounts.append(5)
        else:
                 nbsum+= 2
                 sendcounts.append(2)
    
    if rank==0:
        print "nbsum "+str(nbsum)
        print "sendcounts "+str(tuple(sendcounts))
        print "displacements "+str(tuple(displacements))
    print "rank "+str(rank)+" array_lst "+str(array_lst)
    print "numpy.int "+str(numpy.dtype(numpy.int))+" "+str(numpy.dtype(numpy.int).itemsize)+" "+str(numpy.dtype(numpy.int).name)
    
    if rank==0:
        result2=numpy.empty(nbsum, dtype=numpy.int)
    else:
        result2=None
    
    comm.Gatherv(array_lst,[result2,tuple(sendcounts),tuple(displacements),MPI.DOUBLE],root=0)
    
    if rank==0:
        print "Gatherv, result2= "+str(result2)