Search code examples
rforeachparallel-processingmultidplyr

R: Why parallel is (much) slower? What is best strategy in using parallel for a (left) join a large collection of big files?


I've read some questions on the subjects as well as some tutorials but failed to resolve my problem so decided to ask myself.

I have a large collection of big files of types say A, B, C; and I need to left join B, C with A on some conditions. I work on a remote server with 64 CPU and 240GB so naturally I'd like to use it power and process in parallel. A crucial knowledge I have is that if a_i file could be successfully joined only with b_i, b_(i+1) from B, same for C. My initial try was to have a 'join_i' function for 'a_i' file and then run it in parallel (I have 448 files). However, there was no significant time improvement and in fact as I watched performance - sadly, CPU were loaded on a very low percentage. As far as I could dig into the issue, I think the bottleneck is IO, especially because all files are big. Is it a valid hypothesis? In any case, on a second try I decided to go through each file sequentially, but use parallel advantage within each iteration. However, after numerous attempts I didn't get any luck here as well. I tried to make a minimal example below where parallel is much much slower (and in fact on my real data it freezes). What is wrong here? Is it a code mistake or some deeper misunderstanding of how parallel in R works? Also, I tried some multidplyr and mclapply but in both cases no luck either. I also want to point out, that reading files takes more than a join itself: within 1 iteration reading takes about 30 sec (I use fread, unzipping inside it through cmd) while join takes about 10 sec. What is the best strategy here given this? Thanks in advance!

library(dplyr) 

A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)


chunk_join=function(i, A, B, C)
{
  A_i=A %>% filter(X2==i)
  B_i=B %>% filter(X2==i) %>% select(X1, X3)
  C_i=C %>% filter(X2==i) %>% select(X1, X3)
  join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}

library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)

# not parallel 

s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %do%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))

# parallel 
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %dopar%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))

R=rbind(r1, r2)
T=rbind(t1, t2)

R
T

On my server this gives around 5s for %do% and over 1m for %dopar%. Note, that this is for join itself, without even taking into account time for making clusters. By the way, can someone also comment how many cluster shall I have? Say, I partition data on X even-sized chunks and have Y CPU available - shall I put Y - as much as possible, or X, or some other number of clusters?


Solution

  • There are two issues why your multithreading is slow:

    1) Data transfer to new threads 2) Data transfer from new threads back to main threads

    Issues #1 is completely avoided by using mclapply, which doesn't copy data unless it is modified, on unix systems. (makeCluster by default uses sockets to transfer data).

    Issue #2 cannot be avoided using mclapply, but what you can do is to minimize the amount of data you transfer back to the main thread.

    Naive mclapply:

    join3 = mclapply(1:10, function(j) {
      join_i=chunk_join(j, A, B, C)
    }, mc.cores=4) %>% rbindlist
    

    Slighty smarter mclapply:

    chunk_join2=function(i, A, B, C)
    {
      A_i=A %>% filter(X2==i)
      B_i=B %>% filter(X2==i) %>% select(X1, X3)
      C_i=C %>% filter(X2==i) %>% select(X1, X3)
      join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
      join_i[,c(-1,-2,-3)]
    }
    A <- arrange(A, X2)
    join5 = mclapply(1:10, function(j) {
      join_i=chunk_join2(j, A, B, C)
    }, mc.cores=4) %>% rbindlist
    join5 <- cbind(A, join5)
    

    Benchmarks:

    Single threaded: 4.014s 
    
    Naive mclapply: 1.860 s
    
    Slightly smarter mclapply: 1.363 s
    

    If your data has a lot of columns, you can see how Issue #2 will completely bog down the system. You can do even better by e.g. returning the indices of B and C instead of whole data.frame subset.