Search code examples
cmpidistributed-computing

Async leader election in unrooted spanning tree declares multiple winners


I am trying to implement the algorithm described in the image, using MPI. It is part of a University project where we are building a distributed satellite to ground station communication system. I have made multiple iterations of it, but I can never manage to break ties when multiple candidate winners are elected.

async leader election algorithm

Here is the function that handles the election process. This function is called for every single Ground station process (there are 5 in their communication group, GS 0, 1, 2, 3 and 4) and is started via a START_LELECT_GS event from a coordinator process (rank 10).

void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
    MPI_Status status;
    int is_leader = 0;
    int received_count = 0;
    int neighbor_received_from[num_of_neighbors];

    printf("Process %d: Starting leader election\n", rank);

    // Determine if the process is a leaf node
    const int is_leaf = (num_of_neighbors == 1);
    printf("Process %d: Is leaf? %d\n", rank, is_leaf);

    // Send initial <ELECT> messages only if the process is a leaf node
    if (is_leaf) {
        MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
        printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
    }

    // Receive and process messages from neighbors
    while (1) {
        int sender_rank;
        MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
        printf("Process %d: Received message from %d with tag %d\n", rank, status.MPI_SOURCE, status.MPI_TAG);

        if (status.MPI_TAG == ELECT) {
            received_count++;
            neighbor_received_from[status.MPI_SOURCE] = 1;
            printf("Process %d: Received ELECT from %d\n", rank, status.MPI_SOURCE);

            // If all neighbors except one have sent ELECT messages, probe for incoming messages before sending ELECT to the remaining neighbor
            if (received_count == num_of_neighbors - 1) {
                int flag;
                MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, &status);

                if (flag && status.MPI_TAG == TERMINATE_LELECT_GS) {
                    printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);

                    if (status.MPI_SOURCE > rank) {
                        printf("Process %d: Surrendering leadership to process %d\n", rank, status.MPI_SOURCE);
                        MPI_Send(&rank, 1, MPI_INT, status.MPI_SOURCE, GS_LEADER, comm);
                        is_leader = 0;
                    } else {
                        printf("Process %d: Declaring leadership over process %d\n", rank, status.MPI_SOURCE);
                        is_leader = 1;
                    }

                    // send out terminate to all neighbors but the sender
                    for (int i = 0; i < num_of_neighbors; i++) {
                        if (neighbor_gs[i] != status.MPI_SOURCE) {
                            MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                            printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                        }
                    }
                    break;
                }

                if (!flag) {
                    // if we got elect from all but one, send elect to remaining neighbor
                    for (int i = 0; i < num_of_neighbors; i++) {
                        if (!neighbor_received_from[neighbor_gs[i]]) {
                            MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], ELECT, comm);
                            printf("Process %d: Sent ELECT to remaining neighbor %d\n", rank, neighbor_gs[i]);
                            break;
                        }
                    }
                }
            }

            // If all neighbors have sent ELECT messages, this process is a candidate for the leader
            if (received_count == num_of_neighbors) {
                printf("Process %d: Candidate for leader\n", rank);
                is_leader = 1;
                // send terminate to all neighbors
                for (int i = 0; i < num_of_neighbors; i++) {
                    MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                    printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                }
                break;
            }
        } else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
            printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);

            for (int i = 0; i < num_of_neighbors; i++) {
                if (neighbor_gs[i] != status.MPI_SOURCE) {
                    MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                    printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                }
            }
            break;
        } else if (status.MPI_TAG == GS_LEADER) {
            printf("Process %d: Declared leader due to GS_LEADER message\n", rank);
            is_leader = 1;
            for (int i = 0; i < num_of_neighbors; i++) {
                if (neighbor_gs[i] != status.MPI_SOURCE) {
                    MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                    printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                }
            }
            break;
        }
    }

    MPI_Barrier(comm);
    printf("Process %d: Terminated\n", rank);
    if (is_leader) {
        printf("Process %d: Sending LELECT_GS_DONE to coordinator\n", rank);
        MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
    }
}

Here is an example of a problematic output:

parsing START_LELECT_GS event
Process 0: Starting leader election
Process 0: Is leaf? 0
Process 1: Starting leader election
Process 1: Is leaf? 0
Process 4: Starting leader election
Process 4: Is leaf? 1
Leaf Process 4: Sent ELECT to 0
Process 2: Starting leader election
Process 3: Starting leader election
Process 3: Is leaf? 1
Leaf Process 3: Sent ELECT to 1
Process 2: Is leaf? 1
Leaf Process 2: Sent ELECT to 1
Process 0: Received message from 4 with tag 22
Process 0: Received ELECT from 4
Process 0: Sent ELECT to remaining neighbor 1
Process 1: Received message from 3 with tag 22
Process 1: Received ELECT from 3
Process 1: Received message from 2 with tag 22
Process 1: Received ELECT from 2
Process 1: Sent ELECT to remaining neighbor 0
Process 1: Received message from 0 with tag 22
Process 1: Received ELECT from 0
Process 1: Candidate for leader
Process 1: Sent TERMINATE_LELECT_GS to 3
Process 1: Sent TERMINATE_LELECT_GS to 0
Process 1: Sent TERMINATE_LELECT_GS to 2
Process 0: Received message from 1 with tag 22
Process 0: Received ELECT from 1
Process 0: Candidate for leader
Process 0: Sent TERMINATE_LELECT_GS to 1
Process 0: Sent TERMINATE_LELECT_GS to 4
Process 4: Received message from 0 with tag 24
Process 4: Received TERMINATE_LELECT_GS from 0
Process 3: Received message from 1 with tag 24
Process 3: Received TERMINATE_LELECT_GS from 1
Process 2: Received message from 1 with tag 24
Process 2: Received TERMINATE_LELECT_GS from 1
Process 0: Terminated
Process 0: Sending LELECT_GS_DONE to coordinator
Process 3: Terminated
Process 4: Terminated
Process 1: Terminated
Process 1: Sending LELECT_GS_DONE to coordinator
process 10 waiting at barrier
Process 2: Terminated

Two candidates arise but despite me trying to probe for an intermediate termination message so that I can break the tie early, it does not work but I cannot figure out the reason why.


Solution

  • I did manage to land on an implementation of the said algorithm that seems to be working properly now. Here is the code if anyone is interested.

    P.S. Please do let me know if you see any bugs!

    void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
        MPI_Status status;
        int leader_rank = -1;
        int received_count = 0;
        int neighbor_received_from[num_of_neighbors];
        for (int i = 0; i < num_of_neighbors; i++) neighbor_received_from[i] = 0;
        int remaining_neighbor = -1;
    
        printf("Process %d: Starting leader election\n", rank);
    
        // Determine if the process is a leaf node
        const int is_leaf = (num_of_neighbors == 1);
        printf("Process %d: Is leaf? %d\n", rank, is_leaf);
    
        // Send initial <ELECT> messages only if the process is a leaf node
        if (is_leaf) {
            MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
            printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
        }
    
        do {
            int sender_rank;
            while (received_count < num_of_neighbors - 1) {
                MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
    
                if (status.MPI_TAG == ELECT) {
                    for (int i = 0; i < num_of_neighbors; i++) {
                        if (neighbor_gs[i] == status.MPI_SOURCE) {
                            neighbor_received_from[i] = 1;
                        }
                    }
                    received_count++;
                    printf("Process %d got ELECT from %d (replies = %d)\n", rank, status.MPI_SOURCE, received_count);
                    if (received_count == num_of_neighbors - 1) break;
                }
    
                if (status.MPI_TAG == TERMINATE_LELECT_GS) {
                    // leader has been found, terminate and notify all neighbors by sending leader rank
                    leader_rank = sender_rank;
                    printf("Process %d got TERMINATE from %d with leader %d\n", rank, status.MPI_SOURCE, sender_rank);
                    for (int i = 0; i < num_of_neighbors; i++) {
                        printf("Process %d propagating leader to process %d\n", rank, neighbor_gs[i]);
                        if(neighbor_gs[i] != status.MPI_SOURCE) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                    }
                    break;
                }
            }
    
            if (leader_rank != -1) break; // got terminate while still sending elects, terminate early
    
            printf("iterating for remaining neighbor\n");
            // we have received replies from all neighbors but one
            for (int i = 0; i < num_of_neighbors; i++) {
                printf("considering %d with found %d\n", neighbor_gs[i], neighbor_received_from[i]);
                if (!neighbor_received_from[i]) {
                    printf("remaining neighbor is %d\n", neighbor_gs[i]);
                    remaining_neighbor = neighbor_gs[i];
                    break;
                }
            }
    
            if (remaining_neighbor != -1 && !is_leaf) {
                // if we are a leaf, don't resend
                printf("Process %d has received all replies but one (from %d)...\n", rank, remaining_neighbor);
                int flag;
                // probe to see if the remaining neighbor is sending an ELECT before we send it one
                MPI_Iprobe(remaining_neighbor, ELECT, comm, &flag, &status);
    
                if (flag) {
                    MPI_Recv(&sender_rank, 1, MPI_INT, remaining_neighbor, ELECT, comm, &status);
    
                    // we got our last elect response, so we are the leader
                    printf("Process %d is leader because it received ELECT from remaining neighbor\n", rank);
                    // note: could update count and received neighbors but no need
                    leader_rank = rank;
                    // send out our rank to all neighbors
                    for (int j = 0; j < num_of_neighbors; j++) {
                        printf("Process %d propagating winner (us) to %d\n", rank, neighbor_gs[j]);
                        MPI_Send(&rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                    }
                    break;
                } else {
                    // else we send to the last neighbor
                    printf("Process %d, no incoming last elect from probe, sending elect to last neighbor\n", rank);
                    MPI_Send(&rank, 1, MPI_INT, remaining_neighbor, ELECT, comm);
                }
            }
    
            // have sent to all neighbors, wait for outcome
            printf("Process %d has sent all elects, now waiting for final event\n", rank);
            MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
    
            if (status.MPI_TAG == ELECT && status.MPI_SOURCE == remaining_neighbor) {
                // probe for terminate in case someone else was elected after the elect we received was sent
                int flag = 0;
                MPI_Iprobe(MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &flag, &status);
    
                if (flag) {
                    // another process was declared leader faster, abort
                    MPI_Recv(&leader_rank, 1, MPI_INT, MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &status);
                    printf("Process %d got terminate from %d because another process won first!\n", rank,
                           status.MPI_SOURCE);
                    for (int j = 0; j < num_of_neighbors; j++) {
                        printf("Process %d sending terminate to %d\n", rank, neighbor_gs[j]);
                        if(neighbor_gs[j] != status.MPI_SOURCE) MPI_Send(&sender_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                    }
                    break;
                }
    
    
                // means we got our last elect but after we sent out an elect to the remaining neighbor
                // incoming elect right after we sent elect the same way, so contest
                if (rank > sender_rank) {
                    printf("Process %d got elect on same edge from %d and won\n", rank, sender_rank);
                    // we win the fight and are the leader
                    leader_rank = rank;
                    for (int j = 0; j < num_of_neighbors; j++) {
                        printf("Process %d propagating ourself (winner) to %d\n", rank, neighbor_gs[j]);
                        if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                    }
                    break;
                } else {
                    // sender has won and is leader
                    printf("Process %d got elect on same edge and lost fight. winner was %d\n", rank,
                           status.MPI_SOURCE);
                    leader_rank = status.MPI_SOURCE;
                    for (int j = 0; j < num_of_neighbors; j++) {
                        printf("Process %d propagating other process/winner to %d\n", rank, neighbor_gs[j]);
                        if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                    }
                    break;
                }
            } else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
                // we got terminate signal, notify neighbors
                leader_rank = sender_rank;
                printf("Process %d got terminate from %d with leader being %d!\n", rank, status.MPI_SOURCE, leader_rank);
                for (int j = 0; j < num_of_neighbors; j++) {
                    if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                }
                break;
            }
        } while (1);
        printf("Process %d exited loop with leader being %d\n", rank, leader_rank);
        MPI_Barrier(comm);
        if (rank == leader_rank) {
            printf("Leader sending done to coordinator\n");
            MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
        }
    }