I would like to gather numpy array contents from all processors to one. In case all arrays are of the same size, it works. However I don't see a natural way of doing the same task for arrays of proc-dependent size. Please consider the following code:
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank >= size/2:
nb_elts = 5
else:
nb_elts = 2
# create data
lst = []
for i in xrange(nb_elts):
lst.append(rank*3+i)
array_lst = numpy.array(lst, dtype=int)
# communicate array
result = []
if rank == 0:
result = array_lst
for p in xrange(1, size):
received = numpy.empty(nb_elts, dtype=numpy.int)
comm.Recv(received, p, tag=13)
result = numpy.concatenate([result, received])
else:
comm.Send(array_lst, 0, tag=13)
My problem is at the "received" allocation. How can I know what is the size to be allocated? Do I have to first send/receive each array size?
Based on a suggestion below, I'll go with
data_array = numpy.ones(rank + 3, dtype=int)
data_array *= rank + 5
print '[{}] data: {} ({})'.format(rank, data_array, type(data_array))
# make all processors aware of data array sizes
all_sizes = {rank: data_array.size}
gathered_all_sizes = comm_py.allgather(all_sizes)
for d in gathered_all_sizes:
all_sizes.update(d)
# prepare Gatherv as described by @francis
nbsum = 0
sendcounts = []
displacements = []
for p in xrange(size):
n = all_sizes[p]
displacements.append(nbsum)
sendcounts.append(n)
nbsum += n
if rank==0:
result = numpy.empty(nbsum, dtype=numpy.int)
else:
result = None
comm_py.Gatherv(data_array,[result, tuple(sendcounts), tuple(displacements), MPI.INT64_T], root=0)
print '[{}] gathered data: {}'.format(rank, result)
In the code you pasted, both Send()
and Recv()
sends nb_elts
elements. The problem is that nb_elts
is not the same for every processes... Hence, the number of item received does not match the number of elements that were sent and the program complains:
mpi4py.MPI.Exception: MPI_ERR_TRUNCATE: message truncated
To prevent that, the root process must compute the number of items that the other processes have sent. Hence, in the loop for p in xrange(1, size)
, nb_elts
must be computed according to p
, not rank
.
The following code based on yours has been corrected. I would add that the natural way to perform this gathering operation is to use Gatherv()
. See http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html and the documentation of mpi4py for instance. I added the corresponding sample code. The only tricky point is that numpy.int
is 64bit long. Hence, the Gatherv()
uses the MPI type MPI_DOUBLE
.
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
if rank >= size/2:
nb_elts = 5
else:
nb_elts = 2
# create data
lst = []
for i in xrange(nb_elts):
lst.append(rank*3+i)
array_lst = numpy.array(lst, dtype=int)
# communicate array
result = []
if rank == 0:
result = array_lst
for p in xrange(1, size):
if p >= size/2:
nb_elts = 5
else:
nb_elts = 2
received = numpy.empty(nb_elts, dtype=numpy.int)
comm.Recv(received, p, tag=13)
result = numpy.concatenate([result, received])
else:
comm.Send(array_lst, 0, tag=13)
if rank==0:
print "Send Recv, result= "+str(result)
#How to use Gatherv:
nbsum=0
sendcounts=[]
displacements=[]
for p in xrange(0,size):
displacements.append(nbsum)
if p >= size/2:
nbsum+= 5
sendcounts.append(5)
else:
nbsum+= 2
sendcounts.append(2)
if rank==0:
print "nbsum "+str(nbsum)
print "sendcounts "+str(tuple(sendcounts))
print "displacements "+str(tuple(displacements))
print "rank "+str(rank)+" array_lst "+str(array_lst)
print "numpy.int "+str(numpy.dtype(numpy.int))+" "+str(numpy.dtype(numpy.int).itemsize)+" "+str(numpy.dtype(numpy.int).name)
if rank==0:
result2=numpy.empty(nbsum, dtype=numpy.int)
else:
result2=None
comm.Gatherv(array_lst,[result2,tuple(sendcounts),tuple(displacements),MPI.DOUBLE],root=0)
if rank==0:
print "Gatherv, result2= "+str(result2)