Search code examples
rparallel-processingsnow

Export different subsets of data.tables to each node in a cluster


I am writing a function that processes several very large data.tables and I want to parallelize this function on a Windows machine.

I could do this with the snow package using clusterExport to create a copy of each of the data.tables for each node in the cluster. However this does not work because it uses too much memory.

I want to fix this by exporting a different subset of the data.tables to each node, however, I can't see how to do this in the snow package.

Here is a toy example of code that works but is memory inefficient:

library(snow)
dd <- data.frame(a = rep(1:5, each = 2), b = 11:20)
cl <- makeCluster(2, type = "SOCK")
clusterExport(cl = cl, "dd")
clusterApply(cl, x = c(2,7),  function(thresh) colMeans(dd[dd$a < thresh,]))
stopCluster(cl)

Here is an example of code that does not work but explains how we would like to distribute subsets of dd to the nodes:

library(snow)
dd <- data.frame(a = rep(1:5, each = 2), b = 11:20)
cl <- makeCluster(2, type = "SOCK")

dd_exports <- lapply(c(2,7), function(thresh) dd[dd$a < thresh])
#Now we export the ith element of dd_exports to the ith node:
clusterExport(cl = cl, dd_exports) 
clusterApply(cl, x = c(2,7),  function(x) colMeans(dd))
stopCluster(cl)

Solution

  • Since cl is a list, simply subset it during the clusterExport call:

    library(data.table)
    library(parallel)
    
    dt <- data.table(a = rep(1:5, each = 2), b = 11:20)
    cl <- makeCluster(2)
    idx <- c(2, 7)
    
    for (i in seq_along(cl)) {
      dt2 <- dt[a < idx[i]]
      clusterExport(cl[i], "dt2")
    }
    rm(dt2)
    clusterEvalQ(cl, colMeans(dt2))
    #> [[1]]
    #>    a    b 
    #>  1.0 11.5 
    #> 
    #> [[2]]
    #>    a    b 
    #>  3.0 15.5