I am writing a function that processes several very large data.tables and I want to parallelize this function on a Windows machine.
I could do this with the snow
package using clusterExport
to create a copy of each of the data.tables for each node in the cluster. However this does not work because it uses too much memory.
I want to fix this by exporting a different subset of the data.tables to each node, however, I can't see how to do this in the snow package.
Here is a toy example of code that works but is memory inefficient:
library(snow)
dd <- data.frame(a = rep(1:5, each = 2), b = 11:20)
cl <- makeCluster(2, type = "SOCK")
clusterExport(cl = cl, "dd")
clusterApply(cl, x = c(2,7), function(thresh) colMeans(dd[dd$a < thresh,]))
stopCluster(cl)
Here is an example of code that does not work but explains how we would like to distribute subsets of dd
to the nodes:
library(snow)
dd <- data.frame(a = rep(1:5, each = 2), b = 11:20)
cl <- makeCluster(2, type = "SOCK")
dd_exports <- lapply(c(2,7), function(thresh) dd[dd$a < thresh])
#Now we export the ith element of dd_exports to the ith node:
clusterExport(cl = cl, dd_exports)
clusterApply(cl, x = c(2,7), function(x) colMeans(dd))
stopCluster(cl)
Since cl
is a list, simply subset it during the clusterExport
call:
library(data.table)
library(parallel)
dt <- data.table(a = rep(1:5, each = 2), b = 11:20)
cl <- makeCluster(2)
idx <- c(2, 7)
for (i in seq_along(cl)) {
dt2 <- dt[a < idx[i]]
clusterExport(cl[i], "dt2")
}
rm(dt2)
clusterEvalQ(cl, colMeans(dt2))
#> [[1]]
#> a b
#> 1.0 11.5
#>
#> [[2]]
#> a b
#> 3.0 15.5