Search code examples
cperformanceparallel-processingmpi

Improving MPI performance


Hello I have this problem, I have an MPI program in c that works perfectly fine, but the thing is, I believe it could be improved since the MPI_Allgather() function makes every node wait for the slowest one, which I think could be improved, here's the code:

void update_global_cache(Diccionario *global_cache, Diccionario *new_entries) {
    merge_dictionaries(global_cache, new_entries);
}

int main(int argc, char *argv[]) {
    int rank, size;
    double start_time, end_time;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    start_time = MPI_Wtime(); // Start timer

    Point *points = NULL;
    int num_points = 0, dimensions = 0;
    Diccionario *global_cache = create_table();

    load_data("ciudades_a_analizar.csv", &points, &num_points, &dimensions);
    printf("Process %d: Data loaded: %d points with %d dimensions each.\n", rank, num_points, dimensions);

    int bloque_trabajo = num_points / size;
    int restante = num_points % size;
    int work_done = rank * bloque_trabajo + (rank < restante ? rank : restante);
    int end = work_done + bloque_trabajo - 1 + (rank < restante ? 1 : 0);

    struct timespec start_time_work, end_time_work;
    clock_gettime(CLOCK_MONOTONIC, &start_time_work);

    for (int i = work_done; i <= end; i++) {
        struct timespec iteration_start, iteration_end;
        clock_gettime(CLOCK_MONOTONIC, &iteration_start);

        printf("Nodo %d: estoy en: %d, final: %d\n", rank, i, end);
        Diccionario *new_cache = create_table();
        int resultado = KNN(points[i], global_cache, new_cache);
        printf("Nodo %d: termino mi knn\n", rank);

        char *serialized_dict = serialize_dict(new_cache);
        int local_serialized_size = calculate_serialized_size(new_cache);

        if (rank == MASTER_NODE) {
            int *serialized_sizes = (int *)malloc(size * sizeof(int));
            MPI_Gather(&local_serialized_size, 1, MPI_INT, serialized_sizes, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);

            int *displs = (int *)malloc(size * sizeof(int));
            displs[0] = 0;
            for (int j = 1; j < size; j++) {
                displs[j] = displs[j - 1] + serialized_sizes[j - 1];
            }
            int total_size = displs[size - 1] + serialized_sizes[size - 1];

            char *all_serialized_dicts = (char *)malloc(total_size);
            MPI_Gatherv(serialized_dict, local_serialized_size, MPI_BYTE, all_serialized_dicts, serialized_sizes, displs, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);

            // Procesar todos los diccionarios en el nodo maestro
            Diccionario *final = create_table();
            for (int j = 0; j < size; j++) {
                char *buffer = all_serialized_dicts + displs[j];
                Diccionario *dict = deserialize_dict(buffer, serialized_sizes[j]);
                merge_dictionaries(final, dict);
                free_table(dict);
            }

            // Enviar el diccionario global actualizado a todos los nodos
            char *global_serialized_dict = serialize_dict(final);
            int global_serialized_size = calculate_serialized_size(final);

            MPI_Bcast(&global_serialized_size, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);
            MPI_Bcast(global_serialized_dict, global_serialized_size, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);

            free(all_serialized_dicts);
            free(serialized_sizes);
            free(displs);
            free(global_serialized_dict);
            free_table(final);
        } else {
            // Enviar el diccionario local al nodo maestro
            MPI_Gather(&local_serialized_size, 1, MPI_INT, NULL, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);
            MPI_Gatherv(serialized_dict, local_serialized_size, MPI_BYTE, NULL, NULL, NULL, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);

            // Esperar a recibir el diccionario global actualizado
            int global_serialized_size;
            MPI_Bcast(&global_serialized_size, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);

            char *global_serialized_dict = (char *)malloc(global_serialized_size);
            MPI_Bcast(global_serialized_dict, global_serialized_size, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);

            Diccionario *final = deserialize_dict(global_serialized_dict, global_serialized_size);
            merge_dictionaries(global_cache, final);
            free(global_serialized_dict);
            free_table(final);
        }

        // Liberar memoria
        free(serialized_dict);
        free_table(new_cache);

        clock_gettime(CLOCK_MONOTONIC, &iteration_end);
        double time_spent_iteration = (iteration_end.tv_sec - iteration_start.tv_sec) + 
                                      (iteration_end.tv_nsec - iteration_start.tv_nsec) / 1e9;
        printf("Process %d: Time for iteration %d: %f seconds\n", rank, i, time_spent_iteration);
    }

    clock_gettime(CLOCK_MONOTONIC, &end_time_work);
    double time_spent_work = (end_time_work.tv_sec - start_time_work.tv_sec) + 
                             (end_time_work.tv_nsec - start_time_work.tv_nsec) / 1e9;

    if (rank == MASTER_NODE) {
        end_time = MPI_Wtime(); // End timer
        double time_spent_total = end_time - start_time;
        printf("Process 0: Total computation time: %f seconds\n", time_spent_total);
        printf("Process 0: Total work time: %f seconds\n", time_spent_work);
    }

    free(global_cache);
    MPI_Finalize();
    return 0;
}

it's a lot of lines but it basically loads a CSV into an array (each node loads it's own matrix, I know I can load the CSV data in one node and then share it with Bcast but I had tons of problems trying to do that so I left it this way), and then splits the array into equal parts for each node, then, each node executes KNN(), which is a program defined in knn.c (doesn't really matter in this problem) and then they return a new_cache with the results of the calculation done in KNN(), so in case I need to re-do those calculations, I already have the result in the cache and avoid redundancy, each node must share their new_cache findings with every other node to make it more efficient, the thing is, each node has to wait in the MPI_Allgather() function for the rest of the nodes, and KNN() is a very demanding function (about 60 seconds, it's made on purpose), making the nodes wait for the slowest one doesn't seem like a good strategy so I'm trying to find a way to make it asynchronous, if someone has an idea on how to implement this in a different manner I'm glad to hear it, it's my first MPI project so I'm not really an expert with MPI, thanks for the help.

Tried to implement a master node, that recollects the dictionaries of the nodes asynchronously and then Bcasts them, but it didn't really work, I still don't know why, as I said before, I'm still new to this library so I'm no expert. It might be easier than I think but I'm really having a hard time with this.

Just for clarification, the dictionary Diccionario is implemented in defs.h made by me, this is a little High Performance Computing project I have to do for my university subject


Solution

  • You are using a Gather / computation / broadcast scheme. You could do an Allgather and then have every process do the computation redundantly. That will be better, unless the gathered array is enormous in size. Otherwise it will halve your MPI time, approximately.

    You have two gathers of unrelated quantities. You can replace that by two Igather calls followed by MPI_Waitany. You then process whichever gather finishes first. EDIT this does not seem to be the case here, but I'll let this paragraph stand as a possible optimization for other cases.

    As remarked in a comment, you might even be able to collapse the gather/compute into a reduction. That will be a space and time savings, but you should only worry about that if the gathered array is enormous.