I am using the R programming language. Recently, I came across the following function in R makePSOCKcluster
, which can be used to "accelerate" the speed at which R can perform certain tasks (https://www.rdocumentation.org/packages/future/versions/1.19.1/topics/makeClusterPSOCK).
Based on some posts that I have seen, it seems like the makePSOCKcluster
acts like a "wrapper" in which you place your code that would like to "accelerate" , for example:
library(doParallel)
library(future)
cl <- makePSOCKcluster(6) # 6 cpu cores out of 8
registerDoParallel(cl)
### enter your code here
stopCluster(cl) # when finished`
I tried to adapt this setup to help accelerate an R procedure I am using:
cl <- makePSOCKcluster(6) # 6 cpu cores out of 8
registerDoParallel(cl)
### START
library(dplyr)
library(data.table)
results_table <- data.frame()
grid_function <- function(train_data, random_1, random_2, random_3, random_4, split_1, split_2, split_3) {
#bin data according to random criteria
train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
train_data$cat = as.factor(train_data$cat)
#new splits
a_table = train_data %>%
filter(cat == "a") %>%
select(a1, b1, c1, cat)
b_table = train_data %>%
filter(cat == "b") %>%
select(a1, b1, c1, cat)
c_table = train_data %>%
filter(cat == "c") %>%
select(a1, b1, c1, cat)
#calculate random quantile ("quant") for each bin
table_a = data.frame(a_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_1)))
table_b = data.frame(b_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_2)))
table_c = data.frame(c_table%>% group_by(cat) %>%
mutate(quant = quantile(c1, prob = split_3)))
#create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
#group all tables
final_table = rbind(table_a, table_b, table_c)
#create a table: for each bin, calculate the average of "diff"
final_table_2 = data.frame(final_table %>%
group_by(cat) %>%
summarize(
mean = mean(diff)
))
#add "total mean" to this table
final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
#format this table: add the random criteria to this table for reference
final_table_2$random_1 = random_1
final_table_2$random_2 = random_2
final_table_2$random_3 = random_3
final_table_2$random_4 = random_4
final_table_2$split_1 = split_1
final_table_2$split_2 = split_2
final_table_2$split_3 = split_3
results_table <- rbind(results_table, final_table_2)
final_results = dcast(setDT(results_table), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}
# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)
#grid
random_1 <- seq(80,100,5)
random_2 <- seq(85,120,5)
random_3 <- seq(85,120,5)
random_4 <- seq(90,120,5)
split_1 = seq(0,1,0.1)
split_2 = seq(0,1,0.1)
split_3 = seq(0,1,0.1)
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)
#reduce the size of the grid for this example
DF_1 = DF_1[1:100000,]
colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4", "split_1", "split_2", "split_3")
train_data_new <- copy(train_data)
resultdf1 <- apply(DF_1,1, # 1 means rows
FUN=function(x){
do.call(
# Call Function grid_function2 with the arguments in
# a list
grid_function,
# force list type for the arguments
c(list(train_data_new), as.list(
# make the row to a named vector
unlist(x)
)
))
}
)
l = resultdf1
final_output = rbindlist(l, fill = TRUE)
### END
# when finished`
stopCluster(cl)
If I were to run the above code "locally", it would take a very long time to run. I ran the above code using the "makePSOCKcluster" wrapper - the code is still running.
Question: I am not sure if the "makePSOCKcluster" will actually make a difference - I am also not sure if I have used the "makePSOCKcluster" wrapper in the correct way. Can someone please tell me if what I am doing is correct? Are there any other ways to accelerate this code?
Here is an attempt at it.
The function below is faster. I believe it could be made faster if data.frames were replaced by matrices. The results are not identical
to the results output by the original function in the question because there are some NA
's that are now zeros.
library(foreach)
library(parallel)
library(dplyr)
library(data.table)
### START
grid_function_2 <- function(train_data, X) {
random_1234 <- X[1:4]
split_123 <- X[ seq_along(X)[-(1:4)] ]
#bin data according to random criteria
ia <- which(train_data$a1 <= random_1234[1] & train_data$b1 <= random_1234[3])
ib <- which(train_data$a1 <= random_1234[2] & train_data$b1 <= random_1234[4])
train_data$cat <- "c"
train_data$cat[ia] <- "a"
train_data$cat[ib] <- "b"
train_data$cat <- factor(train_data$cat)
#new splits
table_abc <- split(train_data, train_data$cat)
#calculate random quantile ("quant") for each bin
table_abc <- lapply(seq_along(table_abc), function(i){
quant <- quantile(table_abc[[i]]$c1, prob = split_123[i])
diff <- as.integer(quant > table_abc[[i]]$c1)
cbind(table_abc[[i]], diff = diff)
})
#group all tables
final_table <- do.call(rbind, table_abc)
final_table_2 <- final_table %>%
group_by(cat) %>%
summarise(mean = mean(diff))
#add "total mean" to this table
final_table_2 <- final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff))
#format this table: add the random criteria to this table for reference
final_table_2$random_1 <- random_1234[1]
final_table_2$random_2 <- random_1234[2]
final_table_2$random_3 <- random_1234[3]
final_table_2$random_4 <- random_1234[4]
final_table_2$split_1 <- split_123[1]
final_table_2$split_2 <- split_123[2]
final_table_2$split_3 <- split_123[3]
dcast(as.data.table(final_table_2), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}
The test data sets are smaller and made reproducible by setting the RNG seed.
# make the results reproducible
set.seed(2021)
# create some data for this example
a1 <- rnorm(1000,100,10)
b1 <- rnorm(1000,100,5)
c1 <- sample.int(1000, 1000, replace = TRUE)
train_data <- data.frame(a1,b1,c1)
#grid
n <- 3
random_1 <- seq(80,100,5)[1:n]
random_2 <- seq(85,120,5)[1:n]
random_3 <- seq(85,120,5)[1:n]
random_4 <- seq(90,120,5)[1:n]
split_1 <- seq(0,1,0.1)[1:n]
split_2 <- seq(0,1,0.1)[1:n]
split_3 <- seq(0,1,0.1)[1:n]
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)
# smaller data set with 3^7 rows
dim(DF_1)
#[1] 2187 7
## NOT RUN
#reduce the size of the grid for this example
#DF_1 <- DF_1[1:100000,]
colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4",
"split_1", "split_2", "split_3")
train_data_new <- copy(train_data)
And now the comparative tests.
The reference value is t0 <- system.time({original apply loop and function})
.
Then there are three tests, with apply/new function
, with mclapply
(not on Windows) and with parLapply
.
t1 <- system.time({
resultdf1 <- apply(DF_1, 1, # 1 means rows
FUN = function(x){
grid_function_2(train_data_new, x)
}
)
})
final_output2 <- rbindlist(resultdf1, fill = TRUE)
# This cannot be run on Windows
t2 <- system.time({
ncores <- detectCores()
resultdf1 <- mclapply(1:nrow(DF_1), function(i){
grid_function_2(train_data_new, DF_1[i, ])
},
mc.cores = ncores - 1L)
})
final_output3 <- rbindlist(resultdf1, fill = TRUE)
t3 <- system.time({
ncores <- detectCores()
cl <- makePSOCKcluster(ncores - 1L) # reserve 1 cpu core
clusterExport(cl, "train_data_new")
clusterExport(cl, "DF_1")
clusterExport(cl, "grid_function_2")
clusterEvalQ(cl, "train_data_new")
clusterEvalQ(cl, "DF_1")
clusterEvalQ(cl, "grid_function_2")
clusterEvalQ(cl, library(dplyr))
clusterEvalQ(cl, library(data.table))
resultdf1 <- parLapply(
cl = cl,
X = 1:nrow(DF_1),
function(i){
grid_function_2(train_data_new, DF_1[i, ])
}
)
# when finished`
stopCluster(cl)
})
final_output4 <- rbindlist(resultdf1, fill = TRUE)
Like I said above, there are NA
values mismatches when comparing the original final_output
with this final_output2
.
all.equal(final_output, final_output2)
#[1] "Column 'b': 'is.NA' value mismatch: 0 in current 243 in target"
identical(final_output2, final_output3)
#[1] TRUE
identical(final_output2, final_output4)
#[1] TRUE
As for the timings, remove t2
if the code is to be run on Windows. The fastest are t2
and t3
by around an order of magnitude.
rbind(t0, t1, t2, t3)