Search code examples
rfunctionparallel-processingcluster-computing

Understanding the impact of "makePSOCKcluster"


I am using the R programming language. Recently, I came across the following function in R makePSOCKcluster, which can be used to "accelerate" the speed at which R can perform certain tasks (https://www.rdocumentation.org/packages/future/versions/1.19.1/topics/makeClusterPSOCK).

Based on some posts that I have seen, it seems like the makePSOCKcluster acts like a "wrapper" in which you place your code that would like to "accelerate" , for example:

library(doParallel)
library(future)

cl <- makePSOCKcluster(6) # 6 cpu cores out of 8

registerDoParallel(cl)

### enter your code here

stopCluster(cl) # when finished`

I tried to adapt this setup to help accelerate an R procedure I am using:

cl <- makePSOCKcluster(6) # 6 cpu cores out of 8

registerDoParallel(cl)

### START

library(dplyr)
library(data.table)

results_table <- data.frame()

grid_function <- function(train_data, random_1, random_2, random_3, random_4, split_1, split_2, split_3) {
    
    
    
    #bin data according to random criteria
    train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
    
    train_data$cat = as.factor(train_data$cat)
    
    #new splits
    a_table = train_data %>%
        filter(cat == "a") %>%
        select(a1, b1, c1, cat)
    
    b_table = train_data %>%
        filter(cat == "b") %>%
        select(a1, b1, c1, cat)
    
    c_table = train_data %>%
        filter(cat == "c") %>%
        select(a1, b1, c1, cat)
    
    
    #calculate random quantile ("quant") for each bin
    
    table_a = data.frame(a_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_1)))
    
    table_b = data.frame(b_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_2)))
    
    table_c = data.frame(c_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_3)))
    
    
    
    
    #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
    table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
    table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
    table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
    
    #group all tables
    
    final_table = rbind(table_a, table_b, table_c)
    
    #create a table: for each bin, calculate the average of "diff"
    final_table_2 = data.frame(final_table %>%
                                   group_by(cat) %>%
                                   summarize(
                                       mean = mean(diff)
                                   ))
    
    #add "total mean" to this table
    final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
    
    #format this table: add the random criteria to this table for reference
    final_table_2$random_1 = random_1
    
    final_table_2$random_2 = random_2
    
    final_table_2$random_3 = random_3
    
    final_table_2$random_4 = random_4
    
    final_table_2$split_1 = split_1
    
    final_table_2$split_2 = split_2
    
    final_table_2$split_3 = split_3
    
    
    
    
    results_table <- rbind(results_table, final_table_2)
    
    final_results = dcast(setDT(results_table), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
    
}

# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)




#grid
random_1 <- seq(80,100,5)
random_2 <- seq(85,120,5)
random_3 <- seq(85,120,5)
random_4 <- seq(90,120,5)
split_1 =  seq(0,1,0.1)
split_2 =  seq(0,1,0.1)
split_3 =  seq(0,1,0.1)
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)

#reduce the size of the grid for this example
DF_1 = DF_1[1:100000,]

colnames(DF_1) <- c("random_1" , "random_2", "random_3",                     "random_4", "split_1", "split_2", "split_3")

train_data_new <- copy(train_data)


resultdf1 <- apply(DF_1,1, # 1 means rows
                   FUN=function(x){
                       do.call(
                           # Call Function grid_function2 with the arguments in
                           # a list
                           grid_function,
                           # force list type for the arguments
                           c(list(train_data_new), as.list(
                               # make the row to a named vector
                               unlist(x)
                           )
                           ))
                   }
)

l = resultdf1
final_output = rbindlist(l, fill = TRUE)

### END

# when finished`
stopCluster(cl) 

If I were to run the above code "locally", it would take a very long time to run. I ran the above code using the "makePSOCKcluster" wrapper - the code is still running.

Question: I am not sure if the "makePSOCKcluster" will actually make a difference - I am also not sure if I have used the "makePSOCKcluster" wrapper in the correct way. Can someone please tell me if what I am doing is correct? Are there any other ways to accelerate this code?


Solution

  • Here is an attempt at it.

    1. Rewrite the function

    The function below is faster. I believe it could be made faster if data.frames were replaced by matrices. The results are not identical to the results output by the original function in the question because there are some NA's that are now zeros.

    library(foreach)
    library(parallel)
    library(dplyr)
    library(data.table)
    
    ### START
    grid_function_2 <- function(train_data, X) {
      random_1234 <- X[1:4]
      split_123 <- X[ seq_along(X)[-(1:4)] ]
      #bin data according to random criteria
      ia <- which(train_data$a1 <= random_1234[1] & train_data$b1 <= random_1234[3])
      ib <- which(train_data$a1 <= random_1234[2] & train_data$b1 <= random_1234[4])
      train_data$cat <- "c"
      train_data$cat[ia] <- "a"
      train_data$cat[ib] <- "b"
      train_data$cat <- factor(train_data$cat)
      #new splits
      table_abc <- split(train_data, train_data$cat)
    
      #calculate random quantile ("quant") for each bin
    
      table_abc <- lapply(seq_along(table_abc), function(i){
        quant <- quantile(table_abc[[i]]$c1, prob = split_123[i])
        diff <- as.integer(quant > table_abc[[i]]$c1)
        cbind(table_abc[[i]], diff = diff)
      })
    
      #group all tables
      final_table <- do.call(rbind, table_abc)
      final_table_2 <- final_table %>%
        group_by(cat) %>%
        summarise(mean = mean(diff))
    
      #add "total mean" to this table
      final_table_2 <- final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff))
    
      #format this table: add the random criteria to this table for reference
      final_table_2$random_1 <- random_1234[1]
      final_table_2$random_2 <- random_1234[2]
      final_table_2$random_3 <- random_1234[3]
      final_table_2$random_4 <- random_1234[4]
      final_table_2$split_1 <- split_123[1]
      final_table_2$split_2 <- split_123[2]
      final_table_2$split_3 <- split_123[3]
    
      dcast(as.data.table(final_table_2), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
    }
    

    2. The test data

    The test data sets are smaller and made reproducible by setting the RNG seed.

    # make the results reproducible
    set.seed(2021)
    
    # create some data for this example
    a1 <- rnorm(1000,100,10)
    b1 <- rnorm(1000,100,5)
    c1 <- sample.int(1000, 1000, replace = TRUE)
    train_data <- data.frame(a1,b1,c1)
    
    #grid
    n <- 3
    random_1 <- seq(80,100,5)[1:n]
    random_2 <- seq(85,120,5)[1:n]
    random_3 <- seq(85,120,5)[1:n]
    random_4 <- seq(90,120,5)[1:n]
    split_1 <- seq(0,1,0.1)[1:n]
    split_2 <- seq(0,1,0.1)[1:n]
    split_3 <- seq(0,1,0.1)[1:n]
    DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)
    
    # smaller data set with 3^7 rows
    dim(DF_1)
    #[1] 2187    7
    
    ## NOT RUN
    #reduce the size of the grid for this example
    #DF_1 <- DF_1[1:100000,]
    
    colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4",
                        "split_1", "split_2", "split_3")
    
    train_data_new <- copy(train_data)
    

    3. Timings

    And now the comparative tests.
    The reference value is t0 <- system.time({original apply loop and function}).

    Then there are three tests, with apply/new function, with mclapply (not on Windows) and with parLapply.

    t1 <- system.time({
      resultdf1 <- apply(DF_1, 1, # 1 means rows
                         FUN = function(x){
                           grid_function_2(train_data_new, x)
                         }
      )
    })
    final_output2 <- rbindlist(resultdf1, fill = TRUE)
    
    # This cannot be run on Windows    
    t2 <- system.time({
      ncores <- detectCores()
      resultdf1 <- mclapply(1:nrow(DF_1), function(i){
        grid_function_2(train_data_new, DF_1[i, ])
      },
      mc.cores = ncores - 1L)
    })
    final_output3 <- rbindlist(resultdf1, fill = TRUE)
    
    t3 <- system.time({
      ncores <- detectCores()
      cl <- makePSOCKcluster(ncores - 1L) # reserve 1 cpu core
      clusterExport(cl, "train_data_new")
      clusterExport(cl, "DF_1")
      clusterExport(cl, "grid_function_2")
      clusterEvalQ(cl, "train_data_new")
      clusterEvalQ(cl, "DF_1")
      clusterEvalQ(cl, "grid_function_2")
      clusterEvalQ(cl, library(dplyr))
      clusterEvalQ(cl, library(data.table))
      resultdf1 <- parLapply(
        cl = cl,
        X = 1:nrow(DF_1),
        function(i){
          grid_function_2(train_data_new, DF_1[i, ])
        }
      )
      # when finished`
      stopCluster(cl)
    })
    final_output4 <- rbindlist(resultdf1, fill = TRUE)
    

    4. Check the results

    Like I said above, there are NA values mismatches when comparing the original final_output with this final_output2.

    all.equal(final_output, final_output2)
    #[1] "Column 'b': 'is.NA' value mismatch: 0 in current 243 in target"
    
    identical(final_output2, final_output3)
    #[1] TRUE
    identical(final_output2, final_output4)
    #[1] TRUE
    

    As for the timings, remove t2 if the code is to be run on Windows. The fastest are t2 and t3 by around an order of magnitude.

    rbind(t0, t1, t2, t3)