Search code examples
c++macosc++11concurrencygrand-central-dispatch

GCD (C++) Parallel looping Errors: Bug in Code, or Bug in Grand Central Dispatch?


The following snippet performs a set of polynomials on a vector of values read in from a TSV file, eg..

0.335832971253701   0.111283951925475
0.28470219633399    0.237865566116303
0.936298227948222   0.00759627336105169
0.441882043347137   0.744594979690941
0.811153553271307   0.906033395660231
#include <iostream>
#include <fstream>
#include <vector>
#include <cmath>
#include <sstream>
#include <dispatch/dispatch.h>

void load_problem(const std::string file, std::vector<std::pair<double,double>>& repo) {
    repo.clear();
    std::ifstream ifs(file);
    if (ifs.is_open()) {
        std::string line;
        while(getline(ifs, line)) {
            double x= std::nan("");
            double y= std::nan("");
            std::istringstream istr(line);
            istr >> std::skipws >> x >> y;
            if (!isnan(x) && !isnan(y)) {
                repo.push_back({x, y});
            };
        }
        ifs.close();
    }
  }

double do_work(const std::vector<std::pair<double,double>>& problem,const std::vector<double>& args, const size_t psz, size_t i) {
    double score = 0.0;
    for (__block size_t j=0; j < psz; j++) {
        score += args[j]*pow(problem[i].first,psz - i);
    }
    score += args[psz];
    return score;
}

int main() {
    // n-factor polynomial - test against a given problem 
    // provided as a set of tab-delimited x y values in 2d.txt
    std::vector<std::pair<double,double>> problem;
    const std::vector<double> args = {0.653398943958799,0.575258222088993,-5.54870756928019,-3.56273265353563,12.4189944179562,1.53213505629763,-4.09124685229838,5.7925805708932};

    load_problem("2d.tsv",problem); // tab-delimited doubles representing x, y.
    
    const size_t psz = args.size() - 1;

    __block double gcd_accumulator = 0.0;
    dispatch_apply(problem.size(), dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0), ^(size_t i){
        gcd_accumulator += do_work(problem, args, psz, i);
    });
        
    double for_accumulator = 0.0;
    for (size_t i=0; i < problem.size(); i++) {
        for_accumulator += do_work(problem, args, psz, i);
    }
    std::cout << gcd_accumulator << std::endl;
    std::cout << for_accumulator << std::endl;

}

With a small number of vectors (eg, 5 rows) The result is most often a pair of numbers:

31.5181
31.5181

But with a vector of 2000 xy values, the results from the GCD are always incorrect: eg

9701.44
11589.8

All I am doing is changing the number of entries in the TSV file.

My guess is that something is happening in dispatch_apply, or that I am suffering from extreme ignorance.

Note that the accepted answer to question 28657106 uses the approach mentioned here.

Mike Vine suggests using atomic. Seemed sensible. Implementation requires using a pointer (otherwise, when using __block, "Call to implicitly-deleted copy constructor of 'std::atomic'")

    std::atomic<double> gcd_a = 0;
    __block std::atomic<double>* gcd_accumulator = &gcd_a;
    dispatch_apply(problem.size(), dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0), ^(size_t i){
        *gcd_accumulator = *gcd_accumulator + do_work(problem, args, psz, i);
    });

Results are still wildly wrong:

3476.98 <--- GCD response
11589.8 <--- Correct for_to response.

However, it does seem to be an issue with the accumulation within the scope of the block. If I use a vector for the returned values, and then add that afterwards, there is no problem at all.

    __block std::vector<double> answers;
    answers.reserve(problem.size());
    
    dispatch_apply(problem.size(), dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0), ^(size_t i){
        answers[i] = do_work(problem, args, psz, i);
    });
    double gcd_accumulator = 0;
    for (size_t i=0; i < problem.size(); i++) {
        gcd_accumulator += answers[i];
    }

Solution

  • Thanks to Mike Vine for pointing out that __block does NOT provide thread safety. Both of the following work to resolve the problem (which was caused by a lack of thread safety on the accumulator).

    First of all, using a mutex.

    #include <mutex>
    
        std::mutex acc_mutex;
        __block std::mutex* m_ptr = &acc_mutex;
        __block double gcd_acc = 0;
        dispatch_apply(problem.size(), dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0), ^(size_t i){
            double value = do_work(problem, args, psz, i);
            std::scoped_lock lock(*m_ptr);  // only lock the accumulator.
            gcd_mutex_accumulator += value;
    
        });
    
    

    Or by using a vector and post-processing:

        __block std::vector<double> answers;
        answers.reserve(problem.size());
        dispatch_apply(problem.size(), dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0), ^(size_t i){
            answers[i] = do_work(problem, args, psz, i);
        });
        double gcd_vector_accumulator = 0;
        for (size_t i=0; i < problem.size(); i++) {
            gcd_vector_accumulator += answers[i];
        }