Search code examples
rconcurrency

How to run concurrent calculations and combine the results in R?


I have a calculation of interest that takes some time to compute. The function itself is not able to be split apart or optimized naturally—it is particularly not suited for parallelization. In the most obvious way, future computations of the function may depend on the results of previous computations.

However, there exists a natural chunking of the data such that computations within the chunks do not affect other chunks. I would like to leverage this natural chunking of the problem in order to speed up the calculation by allowing each chunk to execute in parallel.

As a toy example, here's some big data and an expensive function:

big.data = data.frame(x1 = c(1, 2, 3), 
                      x2 = c("a", "b", "c"), 
                      planet = c("Arrakis", "Arrakis", "Caladan")
                      )

expensive.function = function(big.data) {
  takes.a.lot.of.time = paste(big.data$x1, big.data$x2)
  return(takes.a.lot.of.time)
}

### Sequentially ###
result = expensive.function(big.data) # Very slow

In this case, the chunking would naturally occur over the "planet" field in the big data.

I have previously managed this by utilizing RStudio's api to spin up a background job per each chunk. This splits up the calculation into a Main.R script, and a RunExpensiveFunction.R script, which is called many times with different data. Both the input and output data are stored in a separate subdirectory.

In code, the Main.R script looks like this:

### Main.R
library(writexl)
library(dplyr)
library(sys)

big.data = data.frame(x1 = c(1, 2, 3), 
                      x2 = c("a", "b", "c"), 
                      planet = c("Arrakis", "Arrakis", "Caladan")
                      )

all.planets = unique(big.data$planet)

for(big.rock in all.planets) {
  rows.in.question = big.data %>% filter(planet == big.rock)
  current_working_directory = getwd()
  directory = paste0(current_working_directory, "\\Planet Data\\", big.rock)
  dir.create(directory)
  write_xlsx(rows.in.question, paste0(directory, "\\rows.xlsx"))
  rstudioapi::jobRunScript("RunExpensiveFunction.R", workingDir = directory)
}

And the RunExpensiveFunction.R script looks like this:

### RunExpensiveFunction.R
# Faster, messier
library(readxl)
library(writexl)
chunked.data = read_excel("rows.xlsx")

expensive.function = function(chunked.data) {
  takes.a.lot.of.time = paste(chunked.data$x1, chunked.data$x2)
  return(takes.a.lot.of.time)
}

planet = chunked.data[1, ]$planet
result = expensive.function(chunked.data)
write_xlsx(result, paste0("all done with ", planet, ".xlsx"))

While this does work as intended and speed up the calculation, recombining the data and continuing after the calculation has been performed turns into a bit of a mess with a high chance of user error (for example, attempting to recombine everything before all the chunks have finished processing).

Is there a cleaner (and perhaps faster) way to process and recombine each chunk in parallel in one script?

I'm leery of any sort of functional "apply" method—my toy example is obviously a candidate for that, and my actual function is not. But perhaps that is an unfounded fear when dealing with the chunk level. If my toy example needs more detail to make that clearer, I will supply it.


Solution

  • This problem, as was suggested, is a prime candidate for mclapply. However, in the author's case, Windows does not allow a mclapply to work in parallel.

    A similar solution can be provided via the parLapply function from the parallel library.

    Utilizing either function requires a slight rework of how the script stores its data.

    ### Main.R -- Data Processing
    library(dplyr)
    
    big.data = data.frame(x1 = c(1, 2, 3), 
                          x2 = c("a", "b", "c"), 
                          planet = c("Arrakis", "Arrakis", "Caladan")
    )
    
    all.planets = unique(big.data$planet)
    stored.data = list()    
    
    for(big.rock in all.planets) {
      rows.in.question = big.data %>% filter(planet == big.rock)
      stored.data = append(stored.data, list(rows.in.question))
    }
    

    Stored data then becomes a list of lists, each inner list containing the needed data frame for expensive function.

    To actually execute expensive function in parallel some further setup is needed.

    ### Main.R -- Execution
    # Faster, not as messy
    library(parallel)
    
    expensive.function = function(chunked.data) {
      takes.a.lot.of.time = paste(chunked.data$x1, chunked.data$x2)
      return(list(takes.a.lot.of.time)) # Return a list to preserve inner output.
    }
    
    # Set up the parallel cluster
    cl = makeCluster(2) # 2 cores used
    # This is how you send your function(s) to the cluster
    clusterExport(cl, c('expensive.function'))
    # You can send libraries to the cluster via this function
    # clusterEvalQ(cl, library("dplyr")) 
    
    # Execute the function across stored.data
    par.result = parLapply(stored.data, expensive.function, cl = cl)
    stopCluster(cl) # Don't leave the cluster around
    

    Depending on what your desired output looks like, how the result is dealt with will vary. In the provided case, it might make sense to return a vector of all the outputs.

    ### Main.R -- Wrap up
    # Obtain a single vector of data
    concat.result = vector()
    for(i in 1:length(par.result)) {
      concat.result = append(concat.result, unlist(par.result[i]))
    }