Search code examples
c++multithreadingoptimizationthread-safetyfalse-sharing

Workaround for writing in different entries of the same vector in multithreading


I already described a similar problem, but only to undrstand its causes. If this counts as duplicated as well, I will remove the quetion

I work on a problem which can be thought of a sort of shortest path computation in really big graph. On this graph I solve a all-to-all shortest path problem, i.e., I find for each node n, each shortest path having n as source.

In order to compte the entire solution fastly I'm adopting multithreading. What I do is simply to divide the set of nodes between the threads and compute the shortest path in parallel. The problem comes when I save the solution.

size_t sz = all_nodes.size();
size_t np = config.n_threads;
size_t part = sz / np;
vector<vector<int>> solutions(np);

The vector containing ALL the solutions is a vector of vectors, and each thread works on a different entry of the vector solutions.

auto paraTask = [&](size_t start, size_t end, vector<int> &sol) {
                for (size_t l = start; l < end; ++l)
                    fun({all_nodes[l]}, sol);
            };
for (size_t i = 0; i < np; i++) {
                size_t start = i * part;
                size_t length = (i + 1 == np) ? sz - i * part : part;
                threads[i] =
                          std::thread(paraTask, start, start + length, std::ref(solutions[i]));
            }

The function fun write on solutions.

At the end I join the threads:

for (auto &&thread: threads) thread.join();

What happens is that the computationl time improves passing from 1 to 2 to 4 threads, but then it starts increasing again. I have 8 physical cores and 16 logical. In my understanding this is due to the resizing vectors (entries of solutions) that cause the nearby threads to pause themselves during this resizing procedures.

I tried to profile the code and reading the cache misses, and there ara around 4mln losses (I really am not sure on how to read this information). Therefore I tried using std::list<vector<int>> solutions hoping that this would overcome the proble related to contiguous memory existing with vectors, but also here after 4 threads the computational time starts to increase again.

How could I solve the problem ?

PS: as a 'stupid' solution I tried to hard coding the single threads: I impose 8 threads, I create a vector for each of them so that they are independent one from another, and they are not anymore entries of a vector of vectors. Also here the same happens, time increases with respect to 4 threads.

Any guess?

EDIT: Here a really small working example of function fun

void fun(const Config &config, const vector<size_t> &points,
                     vector<int> &solution) const {
            // Marked stops
            unordered_set<size_t> marked_stops;
            unordered_set<size_t> new_marked_stops;

            size_t n_stops = stops.size();

            int max_k = config.max_k;
            vector<vector<int>> tau;
            tau.reserve(max_k);

            for (int k = 0; k < max_k; ++k) {
                tau.emplace_back(n_stops, 1440 * 3);
            }

            // For each point, mark them as starting point
            for (size_t p: points) {
                logger.debug("stop: %d", p);
                marked_stops.insert(p);
                tau[0][p] = config.start_time_horizon;
                pred_stop[0][p] = p;
                pred_trip[0][p] = 0;
            }

           
            for (int k = 1; k < max_k; ++k) {
                // First stage: update tau from previous round
                for (size_t i = 0, i_max = n_stops; i < i_max; ++i) {
                    tau[k][i] = tau[k - 1][i];
                    pred_stop[k][i] = pred_stop[k - 1][i];
                    pred_trip[k][i] = pred_trip[k - 1][i];
                }

                
                for (size_t p: marked_stops) {
                    //for each line passing from p, check if I can take that line in p based on tau value 
                    //Check the line, and for each new stop encountered:
                        new_marked_stops.insert(new_stop);
                    
                }

                // Reset marked stops
                std::swap(marked_stops, new_marked_stops);
                new_marked_stops.clear();

                
            // Print results
            for (size_t i = 0, i_max = n_stops; i < i_max; ++i) {
                int tmp_k = -1;
                int tmp_t = 1440 * 3;
                for (int k = 1; k < max_k; ++k)
                    // Find origin of destination i
                if (tmp_k != -1) {
                    solution.push_back(
                            config
                                    .start_time_horizon);
                    solution.push_back(i);
                }
            }


        }

Without entering in details, we have a schedule and we check the schedule to see if we can take a line from a given stop and unlock new stops

EDIT: I removed the part where threads add information to their vectors in solution, and the same still happens, I have an increasing in time after 4 threads.


Solution

  • If I were doing it, I wouldn't have the threads write directly to the output vector. Instead, each thread would run and generate its result, then push it to a thread-safe queue along with something to identify the thread that produced it (if that matters).

    Then a single thread would pop items from the queue, and put them in the correct place in the result vector. Keeps the data handling neat and tidy, and generally scales quite well.