Search code examples
cperformanceparallel-processingmpihpc

Failed add up elements in the array in parallel with several strides MPI


I am trying to parallelly add up the elements from the array. I got an example for the algorithm that I follow to add up the elements with different strides in the array:

input = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57]

First Stride (Add every N/2^1 to N/2^1 + N/2^(1+1): 
input = [ 3,10,1,22,8,28,4,53,4,4,0,57,0,0,0,57]

Second Stride (Add every N/2^2 to N/2^2 + +N/2^(2+1):
input = [3,10,1,22,8,50,4,53,4,57,0,57,0,57,0,57]

Third Stride (Add every N/2^3 to N/2^3 + N/2^(3+1):
input = [3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57]

I wrote the code to distribute the adding work equally to my processors. (To be noted that I am trying to avoid using MPI_Scan)

The processor each has a temp value which means the changed array value and MPI_Gather it back to root, then the root will change the whole input array and MPI_cast the input to each processor to do the adding work again before entering the next stride.

However, my result does not seem to work as I want. I would appreciate if anyone can tell me what I did wrong in my codes.

Here is my code: (Updated)

int DownPhaseFunction(int* input, int size_per_process, int rank, int totalarray, int size, int* Temp0)
{
    //rank is the id of processor
    //size is the total number of processors

    int temp =0;
    int index = 0;
    int index0 = 0;

    //First Stride
    if(rank == 0)
    {
      input[(totalarray)-(totalarray/2)+(totalarray/4)-1] += input[(totalarray)-(totalarray/2)-1];
    }

    MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);

    //Start with Second Stride to the end
    for (int i=4 ; i<totalarray ; i*=2)
    {
    //if the number of elements to be changed is larger than total size of processors, do a few more strides
    for(int j=0;j<=i;j+=(size*totalarray/i))
    {
        index = ( (rank+1)*totalarray/i) + j;
        if (index != totalarray)
        {
            temp = input[(index+(totalarray/i)/2)-1] + input[index-1];
        }
        else
        {
            temp = input[index-1];
        }

        //Gather the changed elements back to root
        MPI_Gather (&temp, size , MPI_INT, Temp0, size, MPI_INT,0,MPI_COMM_WORLD );

        //Let root change the changed elements in the input array
        if(rank == 0)
            {
            for(int r=0; r<size; r++)
            {
            index0 = ((r+1)*totalarray/i)+j;

            if( (index0) != totalarray)
                {
                input[(index0+(totalarray/i)/2-1)] = Temp0[r];
                }

            }
            }

        //send it back to every processor to do the next stride
        MPI_Bcast(input,totalarray,MPI_INT,0,MPI_COMM_WORLD);
    }
    }

    return(*input);
}

Solution

  • The processor each has a temp value which means the changed array value and MPI_Gather it back to root, then the root will change the whole input array and MPI_Bcast the input to each processor to do the adding work again before entering the next stride.

    IMO this design complicates matters. I would suggest to first explicitly set boundaries of the input array range that each process is allowed to work. For an input of 16 it would be as follows:

    Process 0 works from [0 to 4[
    Process 1 works from [4 to 8[
    Process 2 works from [8 to 12[
    Process 3 works from [12 to 16[
    

    to calculate those ranges one can use the following formula:

        int begin = rank * size_per_process;
        int end =  (rank + 1) * size_per_process;
    

    To implement the remains logic we start by having a loop that begins by splitting the array in half, and for each iteration we keep splitting in halves.

        int split_size = totalarray/2;
        while(split_size > 1){
            //....
            split_size = split_size/2;
        }
    

    We need an additional loop to iterate over input array using the current split size, namely:

            for(int i = split_size; i < totalarray; i+= split_size){
                   //... 
            }  
    

    Each rank will only be allowed to work on the array interception assigned to that process, namely:

       for(int i = split_size; i < totalarray; i+= split_size){
           int dest = i + (split_size/2) - 1;
           if(begin <= dest && dest < end)
              input[dest] += input[i -1];
        }
    

    A more improved (but less readable) version:

       int shift = (split_size/2) - 1;
       int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
       for(; dest < end; dest += split_size)
          input[dest] += input[dest - shift -1];
      
    

    After each stride all processes send their array interception to the other processes:

     MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);
    

    The MPI_IN_PLACE ensures that the new input array (resulted from gathering the work done by all processes) replaces the old input array. For an input of 16 elements and 4 processes, the process 0, 1, 2, and 3 will send the elements [0 to 4[, [4 to 8[, [8 to 12[, and [12 to 16[ of their input arrays to all the other processes, respectively. Consequently, at the end of the MPI_Allgather call, every process has the most update input array with all the part that were changed by the processes during the current iteration.

    So for the input = [3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57] with 4 processes the iterations will look like the following:

    First stride:

    Process 2 input[11] += input[7] (input[11] = 4 + 53) 
    

    input array: [3, 10, 1, 22, 8, 28, 4, 53, 4, 4, 0, 57, 0, 0, 0, 57]

    Second stride:

    Process 1 input[5] += input[3] (input[5] = 28 + 22)
    Process 2 input[9] += input[7] (input[9] = 4 + 53)
    Process 3 input[13] += input[11] (input[13] = 0 + 57)
    

    input array: [3, 10, 1, 22, 8, 50, 4, 53, 4, 57, 0, 57, 0, 57, 0, 57]

    Third stride:

    2, Process 0 input[2] += input[1] (input[2] = 1 + 10)
    2, Process 1 input[4] += input[3] (input[4] = 8 + 22)
    2, Process 1 input[6] += input[5] (input[6] = 4 + 50)
    2, Process 2 input[8] += input[7] (input[8] = 4 + 53)
    2, Process 2 input[10] += input[9] (input[10] = 0 + 57)
    2, Process 3 input[12] += input[11] (input[12] = 0 + 57)
    2, Process 3 input[14] += input[13] (input[14] = 0 + 57)
    

    input = [3, 10, 11, 22, 30, 50, 54, 53, 57, 57, 57, 57, 57, 57, 57, 57]

    A complete running example:

    #include <stdio.h>
    #include <stdlib.h>
    #include <mpi.h>
    
    void printArray(int *array, int size){
         int rank;
         MPI_Comm_rank(MPI_COMM_WORLD, &rank);
         if(rank == 0){
        for(int i = 0; i < size; i++)
                printf("%2d ", array[i]);
            printf("\n");
        }
    }
    
    
    int main(int argc, char **argv){
        int totalarray = 16;
        int rank, size;
        MPI_Init(&argc, &argv);
        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
        MPI_Comm_size(MPI_COMM_WORLD, &size);
        int input[16] = {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57}; 
    
        printArray(input, totalarray);
    
        int size_per_process = totalarray/size;
        int begin = rank * size_per_process;
        int end =  (rank + 1) * size_per_process;
        int split_size = totalarray/2;
        while(split_size > 1){
            int shift = (split_size/2) - 1;
            int dest = ((begin == 0) ? split_size : (split_size/begin) * split_size) + shift;
            for(; dest < end; dest += split_size)
                input[dest] += input[dest - shift -1];
            MPI_Allgather(MPI_IN_PLACE, size_per_process, MPI_INT, input, size_per_process, MPI_INT, MPI_COMM_WORLD);
            split_size = split_size/2;
        }
            
        printArray(input, totalarray);
        MPI_Finalize();
    
        return 0;
    }
           
    

    Input : {3,10,1,22,8,28,4,53,4,4,0,4,0,0,0,57}
    Output: {3,10,11,22,30,50,54,53,57,57,57,57,57,57,57,57}

    Bear in mind that this is a toy example of the proposed design; not bulletproof ready to use in production type of code.