Search code examples
rforeachiteratordoparallel

How to pass once the full dataset to one worker and specific subsets to the other workers in foreach loop using isplit()


I am currently fitting a set of models on a subset of data for each level of a factor variable. As the models take a long time to run, I use the foreach and doParallel package to estimate the set of models for each level of a variable in parallel using %dopar%. I only pass the subset of data to each worker to avoid memory issues, using isplit() function from the iterators package.

Now, my question is how to extend my code so that in the first iteration, the models are estimated on the whole dataset, by passing the full dataset to one of the workers. In the next iterations then, I want to pass only a subset of the data to each worker and estimate the models.

I illustrate my problem using some example data of the mtcars dataset below.

Suppose, I want to calculate the the average number of forward gears a car has (gear column), by the number of cylinders of cars (cyl column), in parallel.

First, load package and import the data

library(doParallel)
library(foreach)
library(iterators)
library(dplyr)
#get sample data to illustrate problem  
data("mtcars")
df <- mtcars
df$cyl <- as.factor(df$cyl) #make cyl categorical 

Next, iterate over each level of the cyl column and do the necessary calculations

mycluster <- makeCluster(3)
registerDoParallel(mycluster)

result <- foreach(subset = isplit(df, df$cyl), .combine = "c", .packages = "dplyr") %dopar% {
  x <- summarise(subset$value, mean(gear, na.rm = T))
  return(x)
}
stopCluster(mycluster)

The result is a list containing the average number of gears for each category of number of cylinders.

> result
$`mean(gear, na.rm = T)`
[1] 4.090909

$`mean(gear, na.rm = T)`
[1] 3.857143

$`mean(gear, na.rm = T)`
[1] 3.285714

Now, what I want is to extend this code, so that I have four iterations. In the first iteration, I want to pass the full dataset to the first worker, and calculate the average number of gears for all cars included in the whole dataset. Next, I want to pass the specific subsets of data for each level of gear to the other workers, and calculate the average number of gears, as shown above. So the new thing is just to add one iteration to the isplit() statement where I pass the full dataset.

Expected output:

> result
$`mean(gear, na.rm = T)` #average number of gears across all cars in dataset
[1] 3.6875

$`mean(gear, na.rm = T)`
[1] 4.090909

$`mean(gear, na.rm = T)`
[1] 3.857143

$`mean(gear, na.rm = T)`
[1] 3.285714

I know the example is silly, but it illustrates what I am trying to achieve. In reality, I use a very large dataset and estimate a couple of models that each take a long time to run.The data are however from a census, so I cannot share a few lines of it.


Solution

  • Create a new iterator method

    If you weren't using iterators, this would be as simple as passing subset = c(list(df), split(df, df$cyl)). However, this syntax will not work with isplit(). More importantly, a data frame iterator is basically a list with a $nextElem() function that, when called, evaluates the relevant subset of the data. As you want to be thread safe, you want to ensure this happens in the expected order, by using just one iterator.

    Let's write a function that creates an iterator for a new class, extra. This will be based on iterators::isplit.data.frame(). The difference, however, is that on the first iteration it will yield the entire data frame:

    isplit.extra <- function(x, f, drop = FALSE, ...) {
        first_iter <- TRUE
        it <- isplit(seq_len(nrow(x)), f, drop = drop, ...)
        nextEl <- function() {
            # On first iteration return the entire data frame
            if (first_iter) {
                first_iter <<- FALSE
                return(list(value = x, key = "all_data"))
            }
            # Otherwise split the data frame in the normal way
            i <- nextElem(it)
            list(value = x[i$value, , drop = FALSE], key = i$key)
        }
        structure(list(nextElem = nextEl), class = c("abstractiter", "iter"))
    }
    

    Using this method

    We can assign class extra to your df, and there's then no need for further changes as isplit() will dispatch the appropriate method.

    I have changed your summarise() call to use the walrus operator to create a more useful column name. I've marked the lines I've changed.

    class(df) <- c("extra", class(df)) # new
    result <- foreach(
        subset = isplit(df, df$cyl), # no need to change this
        .combine = "c", .packages = "dplyr"
    ) %dopar% {
        summarise(
            subset$value,
            "mean_gear_cyl_{subset$key}" := mean(gear, na.rm = T) # changed
        )
    }
    

    The output is:

    $mean_gear_cyl_all_data
    [1] 3.6875
    
    $mean_gear_cyl_4
    [1] 4.090909
    
    $mean_gear_cyl_6
    [1] 3.857143
    
    $mean_gear_cyl_8
    [1] 3.285714