I am using the foreach function in R for parallel processing and have encountered some confusion regarding the use of the .export argument. In the past, I faced issues which I temporarily resolved by exporting several items, including a large data.table (approximately 6GB). However, this approach significantly increases memory usage since each parallel process receives its own copy of the data.table.
The foreach documentation describes the .export argument as:
.export: character vector of variables to export. This can be useful when accessing a variable that isn’t defined in the current environment. The default value is NULL.
I’m trying to understand what “isn’t defined in the current environment” specifically entails in this context. Specifically, I am uncertain about when it is necessary to export the entire dataset. Should I always export the data, or are there more memory-efficient strategies for handling such large datasets in parallel processing?
Here's a minimal example:
# Load necessary libraries
library(data.table)
library(foreach)
library(doParallel)
# Dummy data for reproducibility
set.seed(123)
data <- data.table(id = rep(1:10, each = 100), time = rep(1:100, times = 10),
a = rnorm(1000), b = rnorm(1000))
ids <- unique(data$id)
# Dummy function for processing
processData <- function(dt, id) {
# Simulated operation (mean calculation as a placeholder)
mean(dt[, a])
}
# Detect cores and register parallel backend
num_cores <- detectCores() - 1
registerDoParallel(cores = num_cores)
# Run the loop in parallel
results <- foreach(
i = 1:length(ids),
.combine = rbind,
.packages = c("data.table"),
.export = c("processData", "data", "ids")
) %dopar% {
model <- processData(data[id == ids[i]])
# Return result
model
}
# Stop the implicit cluster
stopImplicitCluster()
# Print results
print(results)
Could someone clarify the proper use of .export in such cases, or suggest a more memory-efficient method for parallel processing with large data.tables?
I've been a fan of using the future.apply
package for this kind of thing with data.table:::split.data.table()
. That being said, I'm not sure that it won't duplicate the data in memory.
# data
set.seed(123)
data <- data.table::data.table(
id = rep(1:10, each = 100),
time = rep(1:100, times = 10),
a = rnorm(1000),
b = rnorm(1000)
)
# split into smaller data.tables based on id
data <- split(data, by = "id")
# function to apply
f <- function(x) {
m <- mean(x[, a])
return(m)
}
# run the function for each id/data
lapply(data, f)
# for parallel, plan
future::plan("multisession") # windows
future::plan("multicore") # linux / macOS
# run the same parallelized
future.apply::future_lapply(data, f)
Assuming what you're returning is is a data.table
or data.frame
you could use data.table::rbindlist()
or dplyr::bind_rows()
afterward to combine:
f <- function(x) {
m <- x[1:3, .(a)]
return(m)
}
res <- future.apply::future_lapply(data, f)
res <- data.table::rbindlist(res)
head(res)
If that ends up eating up too much memory, you could always try writing each subset out to disk and reading back in inside the function. That wouldn't duplicate the data in memory. Otherwise you could also try writing to a SQLite file. Something like...
# data
set.seed(123)
data <- data.table::data.table(
id = rep(1:10, each = 100),
time = rep(1:100, times = 10),
a = rnorm(1000),
b = rnorm(1000)
)
# split into smaller data.tables based on id
data <- split(data, by = "id")
# function to write out into separate csv's
write <- function(x) {
id <- x[, unique(id)]
path <- fs::path("path", "to", "folder", id, ext = "csv")
data.table::fwrite(x, path)
return(path)
}
# write
lapply(data, write)
Then, in a fresh session (probably a separate script) you would:
# get paths of all .csv files that you wrote out
data_paths <- fs::path("path", "to", "folder") |> fs::dir_ls()
# function to process
f <- function(x) {
x <- data.table::fread(x)
m <- x[1:3, .(a)]
return(m)
}
# then lapply or future.lapply to get results
future.apply::future_lapply(data_paths, f)
Anyway, those are some thoughts. Hope that helps!