Search code examples
carraysmatrixmpidistributed-computing

MPI_Scatter and Gather - 2D array, uneven blocks


I'm using MPI and I try to send uneven blocks of 2D array to different processors. For instance if I have not squere image which size is 333x225 and I want to send blocks of different sizes to different processors.

I have seen @Jonathan Dursi method for even arrays: sending blocks of 2D array in C using MPI

I try to adapt it to my problem. So far I managed to send even chunks of data to two processes like this:

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */

    malloc2dchar(&local, 5, 10);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {5, 10};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, 10*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[2];
    int displs[2];

    if (rank == 0) {
        for (int i=0; i<2; i++) sendcounts[i] = 1;
        int disp = 0;
        displs[0]=0;
        displs[1]=5;

        //for (int i=0; i<procgridsize; i++) {
        //    for (int j=0; j<procgridsize; j++) {
        //        displs[i*procgridsize+j] = disp;
        //        disp += 1;
        //    }
        //    disp += ((gridsize/procgridsize)-1)*procgridsize;
        //}
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/2, MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<5; i++) {
                putchar('|');
                for (int j=0; j<10; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<5; i++) {
        for (int j=0; j<10; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/2,  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}

So I get:

Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456

Local process on rank 0 is:
|0123456789|
|3456789012|
|6789012345|
|9012345678|
|2345678901|

Local process on rank 1 is:
|5678901234|
|8901234567|
|1234567890|
|4567890123|
|7890123456|

Processed grid:
AAAAAAAAAA
AAAAAAAAAA
AAAAAAAAAA
AAAAAAAAAA
AAAAAAAAAA
BBBBBBBBBB
BBBBBBBBBB
BBBBBBBBBB
BBBBBBBBBB
BBBBBBBBBB

But I want data to be like this (not even chunks):

    AAAAAAAAAA
    AAAAAAAAAA
    AAAAAAAAAA
    AAAAAAAAAA
    AAAAAAAAAA
    AAAAAAAAAA
    BBBBBBBBBB
    BBBBBBBBBB
    BBBBBBBBBB
    BBBBBBBBBB

UPDATE

I have tried to set tab_size depending on process rank. But it doesn't work completly fine.

Here is the code:

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    //if (size != procgridsize*procgridsize) {
    //    fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
    //    MPI_Abort(MPI_COMM_WORLD,1);
    //}

    int tab_size;
    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
        tab_size = 4;
    }
    if(rank == 1)
    {
        tab_size = 6;
    }

    /* create the local array which we'll process */

    malloc2dchar(&local, tab_size, 10);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {tab_size, 10};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, 10*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[2];
    int displs[2];

    int tabsize;
    if (rank == 0) {
        for (int i=0; i<2; i++) sendcounts[i] = 1;
        int disp = 0;
        displs[0]=0;
        displs[1]=tab_size;

        //for (int i=0; i<procgridsize; i++) {
        //    for (int j=0; j<procgridsize; j++) {
        //        displs[i*procgridsize+j] = disp;
        //        disp += 1;
        //    }
        //    disp += ((gridsize/procgridsize)-1)*procgridsize;
        //}
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/2, MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<tab_size; i++) {
                putchar('|');
                for (int j=0; j<10; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<tab_size; i++) {
        for (int j=0; j<10; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/2,  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}

And the output looks like this:

Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|0123456789|
|3456789012|
|6789012345|
|9012345678|
Local process on rank 1 is:
|2345678901|
|5678901234|
|8901234567|
|1234567890|
||
||
[blade001:3727] *** An error occurred in MPI_Gatherv
[blade001:3727] *** reported by process [2497249281,0]
[blade001:3727] *** on communicator MPI_COMM_WORLD
[blade001:3727] *** MPI_ERR_TRUNCATE: message truncated
[blade001:3727] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[blade001:3727] ***    and potentially your MPI job)

Solution

  • Why is your code wrong

    You define a datatype that should be the same differently on different ranks. That's not the way it is done.

    How to do what you attempt correctly

    A decomposition of contigous data by complete rows, as you describe, is much simpler. There no need for complex derived datatypes, in fact you don't need them at all. You can use a very simple datatype representing a row. Then the only task is to setup the size / displacements of MPI_Scatterv correctly:

    int local_rows[2] = {6, 4};
    
    malloc2dchar(&local, local_rows[rank], gridsize);
    
    MPI_Datatype row_type;
    MPI_Type_contiguous(gridsize, MPI_CHAR, &row_type);
    MPI_Type_commit(&row_type);
    
    int displs[2];
    
    if (rank == 0) {
      displs[0] = 0;
      for (int r = 1; r < 2; r++) {
        displs[r] = displs[r - 1] + local_rows[r - 1];
      }
    }
    
    MPI_Scatterv(globalptr, local_rows, displs, row_type, &(local[0][0]),
                 local_rows[rank], row_type, 0, MPI_COMM_WORLD);
    
    ...
    
    MPI_Gatherv(&(local[0][0]), local_rows[rank], row_type, globalptr, local_rows,
                displs, row_type, 0, MPI_COMM_WORLD);
    

    This assumes that the intended sizes {6, 4} are known by all ranks. You can either have everyone compute it deterministically or have only the root compute that and scatter it (non-root ranks need only know their own row count).

    True irregular 2D decomposition

    If you truely want to split out chunks not only consisting of whole rows, it becomes much more complicated. There is a very good answer about that already, so I won't repeat that here. Make sure to read it very carefully and follow it closely.

    Due to the complexity, I would suggest to only do that if you are absolutely sure you need it.

    Overlap

    You cannot send overlapping data with a single scatter. If you need overlap, consider exchanging the data directly between the neighbouring processes that own the range in a halo exchange.