Search code examples
rrcpprcppparallel

RcppParallel result changes with multiple threads


I'm new to Rcpp and RcppParallel. I'm trying to use RcppParallel to optimize my R code and now I'm making some toy codes for studying them. Now I made a RcppParallel code and the result is different from what I thought. Result changes whenever I try the function.

Here is my code

library(Rcpp)
library(RcppParallel)
library(RcppArmadillo)
library(data.table)
library(pryr)

#key<-rep(rep(c("a","b"),each=12500000))
grp<-rep(rep(c("a","b","c","d","e"),each=2500000),2)
val<-rnorm(25000000,0,8)
dat<-setDT(data.frame(grp=grp,val=val))
#raw<-setDT(data.frame(key=key,grp=grp,val=val))
na.omit(dat)
#setkey(dat,"key","grp")
setkey(dat,"grp")
#key<-as.factor(key)
grp<-as.factor(grp)
gc()

sourceCpp("test.cpp")

set.seed(1)
dist<-do.call(rbind,tapply(1:NROW(dat),as.factor(dat$grp),function(x) stats::rmultinom(1,NROW(x),rep(1/NROW(x),NROW(x)))))  

setThreadOptions(numThreads=4)
for(i in 1:10) print(test(dat,table(dat$grp),dist))
setThreadOptions(numThreads=1)
for(i in 1:10) print(test(dat,table(dat$grp),dist))

Here is the Rcpp code

#include <Rcpp.h>
#include <RcppParallel.h>

using namespace Rcpp;
using namespace RcppParallel;

// [[Rcpp::depends(RcppParallel)]]

struct INDSUM : public Worker
{
  const RVector<double> val;
  const RVector<int> dist;
  RVector<double> output;

  INDSUM(const NumericVector &_val, const IntegerVector &_dist, NumericVector &_output)
    : val(_val),dist(_dist),output(_output) {}

  void operator()(size_t begin, size_t end)
  {
    for(size_t i=begin; i< end; i++)
    {
      output[0] += val[i]*dist[i];
      output[1] += val[i]*val[i]*dist[i];
    }
  }
};

// [[Rcpp::export]]
NumericMatrix test(DataFrame &df, NumericVector &grptable, IntegerVector &dist) {

  IntegerVector idx = df[0];
  NumericVector val = df[1];
  size_t grpnum=grptable.length();

  NumericMatrix output(grpnum,2);
  NumericVector tmp(2);
  NumericVector sum(grpnum);
  NumericVector sumsq(grpnum);

  INDSUM indexsum(val, dist, tmp);

  for(size_t j=0, cnt=0 ; j<grpnum; j++)
  {
    parallelFor(cnt,cnt+grptable[j],indexsum);
    sum[j] = tmp[0];
    sumsq[j] = tmp[1];
    tmp[0]=tmp[1]=0;
    cnt+=grptable[j];

    output(j,0)=sum[j];
    output(j,1)=sumsq[j];
  }
  return output;
}

When the number of thread is 4, the result looks like this:

[1,]  -1328.307 17484320
[2,]  -8175.153 96214984
[3,] -17317.002 80623284
[4,]  -7964.977 83306470
[5,]  16800.532 82033877
           [,1]     [,2]
[1,]  -6886.349 79063639
[2,] -19529.382 80570964
[3,] -21653.201 80363894
[4,]  -3256.842 81909243
[5,]   2266.153 79906235
           [,1]     [,2]
[1,] -12306.965 80778175
[2,]   2490.576 80411474
[3,]  -6631.620 80495938
[4,]   1477.019 52269743
[5,]  -1167.497 92402507
           [,1]     [,2]
[1,] -15329.571 81025309
[2,] -10860.718 76984730
[3,]   2430.499 96612706
[4,]  17321.521 97019020
[5,]   6702.637 81961180
           [,1]     [,2]
[1,] -20691.132 80094518
[2,]  -7922.633 80567608
[3,]  -6579.185 83056898
[4,]  24761.582 80163577
[5,]  -8022.315 80314909
           [,1]     [,2]
[1,] -18693.011 80439056
[2,]  -9705.923 80580004
[3,] -20444.317 89155340
[4,]  13285.827 80487654
[5,]  14155.066 84664113
            [,1]      [,2]
[1,] -17247.0503  80775485
[2,] -15141.5544 107365503
[3,]    315.3996  91628341
[4,]   4731.3399  81038215
[5,]   8374.9257  77583065
            [,1]     [,2]
[1,]   -973.0294 81194199
[2,] -13473.3673 79293218
[3,] -11028.3987 80865948
[4,]   1126.7350 80539346
[5,]  12452.9127 79909947
           [,1]     [,2]
[1,] -18049.085 80850260
[2,] -11117.905 81232899
[3,] -12195.323 80512124
[4,]  -9120.614 80557568
[5,]   6800.466 76608380
           [,1]     [,2]
[1,] -33611.785 76090786
[2,] -10439.372 85329662
[3,]  -1847.212 92939924
[4,]  -3906.611 86959278
[5,]  -5839.027 85420969

Whereas when I use single thread, I get the result:

           [,1]      [,2]
[1,] -59155.958 319664972
[2,] -26054.697 320363256
[3,] -40476.176 320388784
[4,]   6573.581 320427866
[5,]  30657.222 319834396

Thnak you.


Solution

  • You have a race condition (also called data race in this case) in our code, since all the threads are writing to the same two element vector tmp. However, incrementing a value needs three steps:

    1. reading the value
    2. incrementing the read value
    3. writing back the incremented value

    If all threads do this in parallel without locking, one thread will write back between another thread reading and writing, throwing away the increment done by the first thread.

    One solution would be to use per-thread output variables which are collected after wards in serial code. It might also make sense to look into RcppParallel::parallelReduce, since what you are trying to do is a reduce type operation.