Search code examples
c++performanceparallel-processingbioinformaticshpx

Performance issue using HPX for parallelization in C++ code


I am trying to parallelize my code using HPX in order to improve performance. Below is the original code and my attempt to refactor it using HPX.

Original Code:

std::vector<std::vector<std::pair<int_t, int_t>>> find_mem(std::vector<std::string> data){
    std::string output = "";
    Timer timer;
    uint_t n = 0;
    unsigned char* concat_data = concat_strings(data, n); 
    uint_t *SA = NULL;
    SA = (uint_t*) malloc(n*sizeof(uint_t));
    int_t *LCP = NULL;
    LCP = (int_t*) malloc(n*sizeof(int_t));
    int32_t *DA = NULL;
    DA = (int32_t*) malloc(n*sizeof(int32_t));

    timer.reset();
    gsacak((unsigned char *)concat_data, (uint_t*)SA, LCP, DA, n);
    double suffix_construction_time = timer.elapsed_time();

    timer.reset();
    int_t min_mem_length = global_args.min_mem_length;
    int_t min_cross_sequence = ceil(global_args.min_seq_coverage * data.size());
    std::vector<uint_t> joined_sequence_bound;
    uint_t total_length = 0;
    for (uint_t i = 0; i < data.size(); i++) {
        joined_sequence_bound.push_back(total_length);
        total_length += data[i].length() + 1;
    }
    // Find all intervals with an LCP >= min_mem_length and <= min_cross_sequence
    std::vector<std::pair<uint_t, uint_t>> intervals = get_lcp_intervals(LCP, min_mem_length, min_cross_sequence, n);

    free(LCP);

    uint_t interval_size = intervals.size();

    std::vector<mem> mems;
    mems.resize(interval_size);
    // Convert each interval to a MEM in parallel
    IntervalToMemConversionParams* params = new IntervalToMemConversionParams[interval_size];
#if (defined(__linux__))
    threadpool pool;
    threadpool_init(&pool, global_args.thread);
    for (uint_t i = 0; i < interval_size; i++) {
        params[i].SA = SA;
        params[i].DA = DA;
        params[i].interval = intervals[i];
        params[i].concat_data = concat_data;
        params[i].result_store = mems.begin() + i;
        params[i].min_mem_length = min_mem_length;
        params[i].joined_sequence_bound = joined_sequence_bound;

        threadpool_add_task(&pool, interval2mem, params+i);
    }
    threadpool_destroy(&pool);
#else
#pragma omp parallel for num_threads(global_args.thread)
    for (uint_t i = 0; i < interval_size; i++) {
        params[i].SA = SA;
        params[i].DA = DA;
        params[i].interval = intervals[i];
        params[i].concat_data = concat_data;
        params[i].result_store = mems.begin() + i;
        params[i].min_mem_length = min_mem_length;
        params[i].joined_sequence_bound = joined_sequence_bound;
        interval2mem(params + i);
    }
#endif

    if (mems.size() <= 0 && global_args.verbose) {
        output = "Warning: There is no MEMs, please adjust your paramters.";
        print_table_line(output);
       
    }

    // Sort the MEMs based on their average positions and assign their indices
    sort_mem(mems, data);

    free(SA);
    free(DA);
    free(concat_data);
    delete[] params;

    uint_t sequence_num = data.size();
    std::vector<std::vector<std::pair<int_t, int_t>>> split_point_on_sequence;
    if (global_args.filter_mode == "global") {
        split_point_on_sequence = filter_mem_fast(mems, sequence_num);
    }
    else {
        split_point_on_sequence = filter_mem_accurate(mems, sequence_num);
    }
    
    global_args.avg_file_size = (n / (split_point_on_sequence[0].size() + 1)) / pow(2, 20);
    double mem_process_time = timer.elapsed_time();
   return split_point_on_sequence;
}

My Attempt Using HPX:

std::vector<std::vector<std::pair<int_t, int_t>>> find_mem(std::vector<std::string> data) {
    Timer timer;
    uint_t n = 0;
    // Task 1: string concatenation
    auto concat_task = hpx::dataflow(hpx::launch::async, [&]() {
        return concat_strings(data, n);
    });

    // Task 2: Suffix Array and LCP
    auto gsacak_task = hpx::dataflow(hpx::launch::async, [&](auto concat_data_future) {
        unsigned char* concat_data = concat_data_future.get();
        uint_t *SA = NULL;
        SA = (uint_t*) malloc(n*sizeof(uint_t));
        int_t *LCP = NULL;
        LCP = (int_t*) malloc(n*sizeof(int_t));
        int32_t *DA = NULL;
        DA = (int32_t*) malloc(n*sizeof(int32_t));

        timer.reset();
        gsacak((unsigned char *)concat_data, (uint_t*)SA, LCP, DA, n);
        return std::make_tuple(SA, LCP, DA, concat_data, n);
    }, concat_task);

    // Task 3: LCP Intervals
    auto intervals_task = hpx::dataflow(hpx::launch::async, [&](auto gsacak_result) {
        auto [SA, LCP, DA, concat_data, n] = gsacak_result.get();
        
        timer.reset();
        int_t min_mem_length = global_args.min_mem_length;
        int_t min_cross_sequence = ceil(global_args.min_seq_coverage * data.size());
        auto intervals = get_lcp_intervals(LCP, min_mem_length, min_cross_sequence, n);
        free(LCP); // LCP não é mais necessário
        return std::make_tuple(intervals, SA, DA, concat_data);
    }, gsacak_task);

    // Task 4: Intervals to MEM
    auto mems_task = hpx::dataflow(hpx::launch::async, [&](auto intervals_result) {
        auto [intervals, SA, DA, concat_data] = intervals_result.get();

        uint_t interval_size = intervals.size();
        std::vector<mem> mems(interval_size);
        int_t min_mem_length = global_args.min_mem_length;

        std::vector<uint_t> joined_sequence_bound;
        uint_t total_length = 0;
        for (auto& seq : data) {
            joined_sequence_bound.push_back(total_length);
            total_length += seq.length() + 1;
        }

        hpx::experimental::for_loop(hpx::execution::par, 0, interval_size, [&](int i) {
            IntervalToMemConversionParams params{
                .SA = SA,
                .DA = DA,
                .concat_data = concat_data,
                .result_store = mems.begin() + i,
                .min_mem_length = min_mem_length,
                .interval = intervals[i],
                .joined_sequence_bound = joined_sequence_bound
            };
            interval2mem(&params);
        });

        free(SA);
        free(DA);
        free(concat_data);
        return mems;
    }, intervals_task);

    // Task 5: MEM Filtering
    auto filter_task = hpx::dataflow(hpx::launch::async, [&](auto mems_future) {
        auto mems = mems_future.get();
        sort_mem(mems, data);
        return (global_args.filter_mode == "global")
            ? filter_mem_fast(mems, data.size())
            : filter_mem_accurate(mems, data.size());
    }, mems_task);


    // Final Result
    auto split_point_on_sequence = filter_task.get();
    global_args.avg_file_size = (n / (split_point_on_sequence[0].size() + 1)) / (1 << 20);
    return split_point_on_sequence;
}

Issue:

My goal is to parallelize parts of this function using HPX, hoping for performance improvement. However, after refactoring, I noticed that the gsacak function (which is written in C and cannot be modified) takes significantly longer to execute. In the original code, it takes approximately 0.99 seconds to execute and completes the function in 1.012 seconds. After my changes, the execution time for gsacak has increased to 2.29 seconds, and the function finishes in 2.31 seconds.

Questions:

Am I approaching this incorrectly? I am using HPX to parallelize parts of the code, but the performance gain is not as expected. What can I do to make the execution time of the refactored version comparable to the original code, since I have not modified the gsacak function, only the way it is called?

Context:

I am learning to work with HPX/C++ and need assistance with parallelization in this context. The performance degradation is only observed with the change in how tasks are scheduled using HPX; the original code works fine without parallelization. I would appreciate any suggestions or insights on how to improve the performance of my refactored code.


Solution

  • If I understand your HPX code correctly, you're creating five tasks that are interdependent on each other (task2 depends on task1's result, task3 depends on task2's result, etc.). That prevents any parallelization from happening as the respective next task will only start running once the predecessor task has produced its result (is finished running).