I am trying to parallelize my code using HPX in order to improve performance. Below is the original code and my attempt to refactor it using HPX.
Original Code:
std::vector<std::vector<std::pair<int_t, int_t>>> find_mem(std::vector<std::string> data){
std::string output = "";
Timer timer;
uint_t n = 0;
unsigned char* concat_data = concat_strings(data, n);
uint_t *SA = NULL;
SA = (uint_t*) malloc(n*sizeof(uint_t));
int_t *LCP = NULL;
LCP = (int_t*) malloc(n*sizeof(int_t));
int32_t *DA = NULL;
DA = (int32_t*) malloc(n*sizeof(int32_t));
timer.reset();
gsacak((unsigned char *)concat_data, (uint_t*)SA, LCP, DA, n);
double suffix_construction_time = timer.elapsed_time();
timer.reset();
int_t min_mem_length = global_args.min_mem_length;
int_t min_cross_sequence = ceil(global_args.min_seq_coverage * data.size());
std::vector<uint_t> joined_sequence_bound;
uint_t total_length = 0;
for (uint_t i = 0; i < data.size(); i++) {
joined_sequence_bound.push_back(total_length);
total_length += data[i].length() + 1;
}
// Find all intervals with an LCP >= min_mem_length and <= min_cross_sequence
std::vector<std::pair<uint_t, uint_t>> intervals = get_lcp_intervals(LCP, min_mem_length, min_cross_sequence, n);
free(LCP);
uint_t interval_size = intervals.size();
std::vector<mem> mems;
mems.resize(interval_size);
// Convert each interval to a MEM in parallel
IntervalToMemConversionParams* params = new IntervalToMemConversionParams[interval_size];
#if (defined(__linux__))
threadpool pool;
threadpool_init(&pool, global_args.thread);
for (uint_t i = 0; i < interval_size; i++) {
params[i].SA = SA;
params[i].DA = DA;
params[i].interval = intervals[i];
params[i].concat_data = concat_data;
params[i].result_store = mems.begin() + i;
params[i].min_mem_length = min_mem_length;
params[i].joined_sequence_bound = joined_sequence_bound;
threadpool_add_task(&pool, interval2mem, params+i);
}
threadpool_destroy(&pool);
#else
#pragma omp parallel for num_threads(global_args.thread)
for (uint_t i = 0; i < interval_size; i++) {
params[i].SA = SA;
params[i].DA = DA;
params[i].interval = intervals[i];
params[i].concat_data = concat_data;
params[i].result_store = mems.begin() + i;
params[i].min_mem_length = min_mem_length;
params[i].joined_sequence_bound = joined_sequence_bound;
interval2mem(params + i);
}
#endif
if (mems.size() <= 0 && global_args.verbose) {
output = "Warning: There is no MEMs, please adjust your paramters.";
print_table_line(output);
}
// Sort the MEMs based on their average positions and assign their indices
sort_mem(mems, data);
free(SA);
free(DA);
free(concat_data);
delete[] params;
uint_t sequence_num = data.size();
std::vector<std::vector<std::pair<int_t, int_t>>> split_point_on_sequence;
if (global_args.filter_mode == "global") {
split_point_on_sequence = filter_mem_fast(mems, sequence_num);
}
else {
split_point_on_sequence = filter_mem_accurate(mems, sequence_num);
}
global_args.avg_file_size = (n / (split_point_on_sequence[0].size() + 1)) / pow(2, 20);
double mem_process_time = timer.elapsed_time();
return split_point_on_sequence;
}
My Attempt Using HPX:
std::vector<std::vector<std::pair<int_t, int_t>>> find_mem(std::vector<std::string> data) {
Timer timer;
uint_t n = 0;
// Task 1: string concatenation
auto concat_task = hpx::dataflow(hpx::launch::async, [&]() {
return concat_strings(data, n);
});
// Task 2: Suffix Array and LCP
auto gsacak_task = hpx::dataflow(hpx::launch::async, [&](auto concat_data_future) {
unsigned char* concat_data = concat_data_future.get();
uint_t *SA = NULL;
SA = (uint_t*) malloc(n*sizeof(uint_t));
int_t *LCP = NULL;
LCP = (int_t*) malloc(n*sizeof(int_t));
int32_t *DA = NULL;
DA = (int32_t*) malloc(n*sizeof(int32_t));
timer.reset();
gsacak((unsigned char *)concat_data, (uint_t*)SA, LCP, DA, n);
return std::make_tuple(SA, LCP, DA, concat_data, n);
}, concat_task);
// Task 3: LCP Intervals
auto intervals_task = hpx::dataflow(hpx::launch::async, [&](auto gsacak_result) {
auto [SA, LCP, DA, concat_data, n] = gsacak_result.get();
timer.reset();
int_t min_mem_length = global_args.min_mem_length;
int_t min_cross_sequence = ceil(global_args.min_seq_coverage * data.size());
auto intervals = get_lcp_intervals(LCP, min_mem_length, min_cross_sequence, n);
free(LCP); // LCP não é mais necessário
return std::make_tuple(intervals, SA, DA, concat_data);
}, gsacak_task);
// Task 4: Intervals to MEM
auto mems_task = hpx::dataflow(hpx::launch::async, [&](auto intervals_result) {
auto [intervals, SA, DA, concat_data] = intervals_result.get();
uint_t interval_size = intervals.size();
std::vector<mem> mems(interval_size);
int_t min_mem_length = global_args.min_mem_length;
std::vector<uint_t> joined_sequence_bound;
uint_t total_length = 0;
for (auto& seq : data) {
joined_sequence_bound.push_back(total_length);
total_length += seq.length() + 1;
}
hpx::experimental::for_loop(hpx::execution::par, 0, interval_size, [&](int i) {
IntervalToMemConversionParams params{
.SA = SA,
.DA = DA,
.concat_data = concat_data,
.result_store = mems.begin() + i,
.min_mem_length = min_mem_length,
.interval = intervals[i],
.joined_sequence_bound = joined_sequence_bound
};
interval2mem(¶ms);
});
free(SA);
free(DA);
free(concat_data);
return mems;
}, intervals_task);
// Task 5: MEM Filtering
auto filter_task = hpx::dataflow(hpx::launch::async, [&](auto mems_future) {
auto mems = mems_future.get();
sort_mem(mems, data);
return (global_args.filter_mode == "global")
? filter_mem_fast(mems, data.size())
: filter_mem_accurate(mems, data.size());
}, mems_task);
// Final Result
auto split_point_on_sequence = filter_task.get();
global_args.avg_file_size = (n / (split_point_on_sequence[0].size() + 1)) / (1 << 20);
return split_point_on_sequence;
}
Issue:
My goal is to parallelize parts of this function using HPX, hoping for performance improvement. However, after refactoring, I noticed that the gsacak function (which is written in C and cannot be modified) takes significantly longer to execute. In the original code, it takes approximately 0.99 seconds to execute and completes the function in 1.012 seconds. After my changes, the execution time for gsacak has increased to 2.29 seconds, and the function finishes in 2.31 seconds.
Questions:
Am I approaching this incorrectly? I am using HPX to parallelize parts of the code, but the performance gain is not as expected. What can I do to make the execution time of the refactored version comparable to the original code, since I have not modified the gsacak function, only the way it is called?
Context:
I am learning to work with HPX/C++ and need assistance with parallelization in this context. The performance degradation is only observed with the change in how tasks are scheduled using HPX; the original code works fine without parallelization. I would appreciate any suggestions or insights on how to improve the performance of my refactored code.
If I understand your HPX code correctly, you're creating five tasks that are interdependent on each other (task2 depends on task1's result, task3 depends on task2's result, etc.). That prevents any parallelization from happening as the respective next task will only start running once the predecessor task has produced its result (is finished running).