Search code examples
rparallel-processingdplyrmagrittrsnow

R, dplyr and snow: how to parallelize functions which use dplyr


Let's suppose that I want to apply, in a parallel fashion, myfunction to each row of myDataFrame. Suppose that otherDataFrame is a dataframe with two columns: COLUNM1_odf and COLUMN2_odf used for some reasons in myfunction. So I would like to write a code using parApply like this:

clus <- makeCluster(4)
clusterExport(clus, list("myfunction","%>%"))

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r) { myfunction(r[1],r[2]) }

The problem here is that R doesn't recognize COLUMN1_odf and COLUMN2_odf even if I insert them in clusterExport. How can I solve this problem? Is there a way to "export" all the object that snow needs in order to not enumerate each of them?

EDIT 1: I've added a comment (in the code above) in order to specify that the otherDataFrame is created interally to myfunction.

EDIT 2: I've added some pseudo-code in order to generalize myfunction: it now uses a global dataframe (aGlobalDataFrame and another function otherFunction)


Solution

  • Done some experiments, so I solved my problem (with the suggestion of Benjamin and considering the 'edit' that I've added to the question) with:

    clus <- makeCluster(4)
    clusterEvalQ(clus, {library(dplyr); library(magrittr)})
    clusterExport(clus, "myfunction", "otherfunction", aGlobalDataFrame)
    
    myfunction <- function(fst, snd) {
     #otherFunction and aGlobalDataFrame are defined in the global env
     otherFunction(aGlobalDataFrame)
    
     # some code to create otherDataFrame **INTERNALLY** to this function
     otherDataFrame %>% dplyr::filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
     return(otherDataFrame)
    }
    
    do.call(bind_rows, parApply(clus, myDataFrame, 1, 
            {function(r) { myfunction(r[1], r[2]) } )
    

    In this way I've registered aGlobalDataFrame, myfunction and otherfunction, in short all the function and the data used by the function used to parallelize the job (myfunction itself)