Search code examples
rparallel-processingmulticore

furrr package in R doesn't keep spreading the jobs across all cores?


My task is to compute Cosine dissimilarities.

Given a dataframe of user observations I perform a cosine dissimilarity between each pair of rows.

Long story short I am using furrr::future_map2_dfr function to spread the calculations across all cores I have.

For some reason when some cores are free while others are working hard their work doesn't keep spreading across other cores.

For example:

Here is the start point:

enter image description here

Now it's in the middle of the calculation:

enter image description here

Why cores 1, 2, 5, 6, 8, 11, 12, 15 doesn't participate and share the left jobs?

Same with other calculations.

Do I miss any settings of furrr that can change current behavior?

P.S

Now there are 5 cores that work "hard" and for some reason furrr doesn't spread their work to all 16 cores to make it faster.

Functions:

dissimilarity_wrapper <- function(n_users, 
                                  train_data, 
                                  train_data_std, 
                                  test_data, 
                                  std_thresh = 0.5) {

  # NOTE:
  # n_users must be set to maximum users in order to make this function
  # work properly.

  # Generating the options:

  user_combinations <- expand.grid(i = seq_len(n_users),
                                   j = seq_len(n_users))

  plan(strategy = multicore)

  expand_grid_options <- furrr::future_map2_dfr(.x = user_combinations$i,
                                                .y = user_combinations$j,
                                                function(x, y) { 
                                                  expand.grid(test_idx = which(test_data$user_id == x),
                                                              train_idx = which(train_data$user_id == y))})

  drop <- c("user_id", "row_num", 
            "obs_id", "scroll_id", 
            "time_stamp", "seq_label", 
            "scroll_length")

  test <- test_data[expand_grid_options$test_idx, !names(test_data) %in% drop]

  train <- train_data[expand_grid_options$train_idx, !names(train_data) %in% drop]

  train_std <- train_data_std[expand_grid_options$train_idx, ]

  # Calculate different D's:

  D_manhattan_scaled <- (abs(test - train) / train_std) %>% rowSums()

  D_cosinus <- 1 - (rowSums(test * train) / (sqrt(rowSums(test^2) * rowSums(train^2))))

  train_std[train_std < std_thresh] <- 1

  D_manhattan_scaled_adj_std <- (abs(test - train) / train_std) %>% rowSums()

  D_manhattan <- (abs(test - train)) %>% rowSums()

  return(expand_grid_options %>%
           dplyr::mutate(
             D_manhattan_scaled = D_manhattan_scaled,
             D_cosinus = D_cosinus,
             D_manhattan_scaled_adj_std = D_manhattan_scaled_adj_std,
             D_manhattan = D_manhattan,
             isSame = test_data[test_idx, ]$user_id == train_data[train_idx, ]$user_id))

}


train_test_std_split <- function(data, 
                                 train_size, 
                                 test_size, 
                                 feature_selection) {

  train_set <- data %>% 
    dplyr::ungroup() %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::group_by(user_id) %>%
    dplyr::filter(row_number() <= train_size) %>%
    dplyr::ungroup()

  if (length(feature_selection) > 1) {

    # Manual:
    # scaling_param_est <- scale_param_est_total_UG

    scaling_param_est <- train_set %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), funs(mean, sd))

  } else if (length(feature_selection) == 1) {

    scaling_param_est <- train_set %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), funs(mean, sd)) %>%
      dplyr::rename_at(vars("mean", "sd"), 
                       funs(paste(feature_selection, ., sep = "_")))

  }

  train_set <- train_set %>%
    dplyr::group_by(user_id) %>%
    dplyr::mutate_at(vars(feature_selection), scale) %>%
    data.table::as.data.table() %>%
    dplyr::ungroup() %>% 
    dplyr::as_tibble() %>%
    dplyr::arrange(time_stamp) 

  train_set_std <- train_set %>%
    dplyr::left_join(train_set %>%
                dplyr::group_by(user_id) %>%
                dplyr::summarize_at(feature_selection, sd) %>%
                dplyr::rename_at(vars(-"user_id"), 
                                 funs(paste0(feature_selection, "_sd"))), by = "user_id") %>%
    dplyr::ungroup() %>%
    dplyr::arrange(time_stamp) %>% 
    dplyr::select(matches("_sd"))

  test_set_unscaled <- data %>%
    dplyr::ungroup() %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::filter(!(obs_id %in% train_set$obs_id)) %>%
    dplyr::group_by(user_id) %>%
    dplyr::filter(row_number() <= test_size) %>%
    dplyr::ungroup()

  # Manual:
  # test_set_joined_with_scaling_params <- cbind(test_set_unscaled, scaling_param_est)
  test_set_unscaled_joined_with_scaling_params <- test_set_unscaled %>%
    dplyr::left_join(scaling_param_est, by = "user_id")

  test_set_unscaled_joined_with_scaling_params[, feature_selection] <-
    (test_set_unscaled_joined_with_scaling_params[, feature_selection] - 
       test_set_unscaled_joined_with_scaling_params[, paste0(feature_selection, "_mean")]) /
    test_set_unscaled_joined_with_scaling_params[, paste0(feature_selection, "_sd")]

  test_set <- test_set_unscaled_joined_with_scaling_params %>%
    dplyr::select(user_id, obs_id, scroll_id, 
                  time_stamp, row_num, scroll_length, 
                  feature_selection)


  # Validate:

  # intersect(unique(test_set$obs_id), unique(train_set$obs_id))

  # compute_std <- train_set %>%
  #   dplyr::group_by(user_id) %>%
  #   dplyr::select(-row_num) %>%
  #   dplyr::rename_at(vars(-user_id, -obs_id, -scroll_id, 
  #                         -time_stamp, -scroll_length), 
  #                    funs(paste(., "std", sep = "_"))) %>%
  #   dplyr::summarize_at(vars(matches("_std$")), funs(sd)) %>% 
  #   dplyr::ungroup()

  return(list("train_set" = train_set,
              "train_set_std" = train_set_std,
              "test_set" = test_set,
              "test_set_unscaled" = test_set_unscaled))

}

build_dissimilarity_rank <- function(n_users, 
                                     train_set, 
                                     train_set_std, 
                                     test_set, 
                                     D_type = "D_cosinus") {

  return(dissimilarity_wrapper(n_users, train_set, train_set_std, test_set) %>% 
           dplyr::mutate(train_user_id = train_set[train_idx, ]$user_id,
                         test_user_id = test_set[test_idx, ]$user_id) %>%
           dplyr::select(test_idx, 
                         train_user_id, 
                         test_user_id, 
                         train_idx, 
                         D_manhattan_scaled,
                         D_cosinus,
                         D_manhattan_scaled_adj_std,
                         D_manhattan,
                         isSame) %>%
           dplyr::group_by(test_idx, train_user_id) %>%
           dplyr::arrange(train_user_id, !!rlang::sym(D_type)) %>%
           dplyr::mutate(D_manhattan_rank = rank(D_manhattan),
                         D_manhattan_scaled_rank = rank(D_manhattan_scaled, ties.method = "first"),
                         D_cosinus_rank = rank(D_cosinus, ties.method = "first")) %>%
           dplyr::ungroup())

}

build_param_est <- function(dissimilarity_rank,
                            K,
                            D_type_rank = "D_manhattan_scaled") {

  return(dissimilarity_rank %>%
           dplyr::filter(isSame, (!!rlang::sym(paste0(D_type_rank, "_rank"))) == K) %>%
           dplyr::group_by(train_user_id) %>%
           dplyr::summarise_at(vars(D_manhattan_scaled,
                                    D_cosinus,
                                    D_manhattan_scaled_adj_std,
                                    D_manhattan),
                               funs(mean, median, sd, quantile(., probs = .9))) %>%
           dplyr::rename_at(vars(matches("_quantile")),
                            funs(str_replace(., "_quantile", "_percentile_90"))) %>%
           dplyr::rename_at(vars(matches("_sd")),
                            funs(str_replace(., "_sd", "_std")))
  )
}

build_dissimilarity_table <- function(dissimilarity_rank,
                                      param_est,
                                      K,
                                      i,
                                      D_type_rank = "D_manhattan_scaled",
                                      D_s = c("D_manhattan_scaled",
                                              "D_cosinus",
                                              "D_manhattan_scaled_adj_std",
                                              "D_manhattan")) {

  dissimilarity_table <- dissimilarity_rank %>%
    dplyr::filter(isSame, (!!rlang::sym(paste0(D_type_rank, "_rank"))) == K) %>%
    dplyr::left_join(param_est, by = c("train_user_id")) %>%
    dplyr::ungroup()

  dissimilarity_table[paste0(D_s, "_norm_standard")] <-
    (dissimilarity_table[D_s] - dissimilarity_table[paste0(D_s, "_mean")]) /
    dissimilarity_table[paste0(D_s, "_std")]

  dissimilarity_table[paste0(D_s, "_norm_median")] <-
    (dissimilarity_table[D_s] - dissimilarity_table[paste0(D_s, "_median")]) /
    (dissimilarity_table[paste0(D_s, "_percentile_90")] - dissimilarity_table[paste0(D_s, "_median")])

  # dplyr::mutate(experiment = i))

  return(dissimilarity_table)

}

k_fold_data_prepare <- function(df, min_scroll_len = 3) {

  # Given the data, split it by user id:

  return(df %>%
           dplyr::filter(scroll_length >= min_scroll_len) %>%
           dplyr::arrange(time_stamp) %>%
           dplyr::ungroup() %>%
           split(.$user_id))

}

k_fold_engine <- function(df, 
                          obs, 
                          n_users, 
                          K = 2, 
                          feature_selection,
                          D_type = "D_cosinus") {

  # Train - Test Split:

  train_set <- df %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::filter(obs_id != obs)

  if (length(feature_selection) > 1) {

   # Manual:  
   # scaling_param_est <- scale_param_est_total_UG
    scaling_param_est <- train_set %>%
      dplyr::arrange(time_stamp) %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), 
                          funs(mean, sd))

  } else if (length(feature_selection) == 1) {

    scaling_param_est <- train_set %>%
      dplyr::arrange(time_stamp) %>%
      dplyr::group_by(user_id) %>%
      dplyr::summarize_at(vars(feature_selection), funs(mean, sd)) %>%
      dplyr::rename_at(vars("mean", "sd"), 
                       funs(paste(feature_selection, ., sep = "_")))

  }

  train_set <- train_set %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::group_by(user_id) %>%
    dplyr::mutate_at(vars(feature_selection), scale) %>%
    as.data.table() %>%
    dplyr::ungroup() %>% 
    as_tibble()

  # Compute std for each train variable:

  train_set_std <- train_set %>%
    dplyr::left_join(train_set %>%
                       dplyr::group_by(user_id) %>%
                       dplyr::summarize_at(feature_selection, sd) %>%
                       dplyr::rename_at(vars(-"user_id"), 
                                        funs(paste0(feature_selection, "_sd"))), by = "user_id") %>%
    dplyr::select(matches("_sd"))

  test_set <- df %>%
    dplyr::filter(obs_id == obs)

  test_set_joined_with_scaling_params <- test_set %>%
    dplyr::left_join(scaling_param_est, by = "user_id") %>%
    dplyr::arrange(time_stamp)

  # Manual:
  # test_set_joined_with_scaling_params <- cbind(test_set, scaling_param_est)

  test_set_joined_with_scaling_params[, feature_selection] <-
    (test_set_joined_with_scaling_params[, feature_selection] - 
       test_set_joined_with_scaling_params[, paste0(feature_selection, "_mean")]) /
    test_set_joined_with_scaling_params[, paste0(feature_selection, "_sd")]

  test_set <- test_set_joined_with_scaling_params %>%
    dplyr::arrange(time_stamp) %>%
    dplyr::select(user_id, obs_id, scroll_id, 
                  time_stamp, row_num, scroll_length, 
                  feature_selection)

  # Compute std for each train variable:
  # compute_std <- train_set %>%
  #   dplyr::group_by(user_id) %>%
  #   dplyr::select(-row_num) %>%
  #   dplyr::rename_at(vars(-user_id, -obs_id, -scroll_id, 
  #                         -time_stamp, -scroll_length), 
  #                    funs(paste(., "std", sep = "_"))) %>%
  #   dplyr::summarize_at(vars(matches("_std$")), funs(sd)) %>% 
  #   dplyr::ungroup()
  # 
  # train_set_std <- dplyr::left_join(train_set, 
  #                            compute_std, 
  #                            by = "user_id") %>%
  #   dplyr::ungroup() %>%
  #   dplyr::select(matches("_std$"))

  # Compute the dissimilarities:

  return(build_dissimilarity_rank(n_users, 
                                  train_set, 
                                  train_set_std, 
                                  test_set,
                                  D_type))

}

k_fold_wrapper <- function(data_df, 
                           K = 2, 
                           D_type_rank = "D_cosinus",
                           feature_selection) {

  data_seqed <- k_fold_data_prepare(data_df)

  # Given the data splitted by user id, split it by observation id:

  data_seqed_by_obs <- future_imap(data_seqed, ~split(., .$obs_id ))

  # Get the observation ids per each splitted sub dataframe:

  obs_ids <- future_imap(data_seqed_by_obs, ~as.integer(names(.)))

  # Feed kfold engine with splitted data by user id and observations names:

  plan(strategy = multicore)

  dissimilarity_rank <- furrr::future_map_dfr(data_seqed, function(x) {

    furrr::future_map_dfr(obs_ids[[as.character(x$user_id[1])]], 

                          function(df, 
                                   obs, 
                                   n_users, 
                                   K, 
                                   feature_selection,
                                   D_type_rank) { 

                            k_fold_engine(df, 
                                          obs, 
                                          n_users, 
                                          K, 
                                          feature_selection,
                                          D_type_rank) }, 

                          df = x, n_users = x$user_id[1], 

                          K = K, feature_selection = feature_selection,

                          D_type = D_type_rank) }  ) 


  if(nrow(dissimilarity_rank[which(rowSums(is.na(dissimilarity_rank)) > 0), ])) {

    dissimilarity_rank <- dissimilarity_rank[which(rowSums(is.na(dissimilarity_rank)) == 0), ] %>%
      dplyr::mutate(row_num = row_number())

  }

  param_estimations <- dissimilarity_rank %>% 
    build_param_est(K, D_type_rank = D_type_rank)

  # Summarize and return final param estimation (average):

  # return(param_estimations %>% 
  #          dplyr::group_by(train_user_id) %>%
  #          summarize_at(vars(-"train_user_id"), mean))

  return(list("dissimilarity_rank" = dissimilarity_rank, 
              "param_estimations" = param_estimations))

}

The final script that causes the issues:

n_users <- max(unique(data$user_id))

train_df <- data %>%
    dplyr::group_by(user_id) %>%
    dplyr::filter(row_number() <= 50)

filter_users_low_amount_obs <- train_df %>%
  dplyr::group_by(user_id) %>%
  dplyr::summarise(n_obs = length(unique(obs_id))) %>% 
  dplyr::arrange(n_obs) %>%
  dplyr::filter(n_obs >= 3) %>%
  select(user_id)

train_df <- train_df %>% 
  filter(user_id %in% filter_users_low_amount_obs$user_id)

k_fold_d_rank_param_est <- k_fold_wrapper(train_df, K, D_type_rank = D_type, feature_selection)

dissimilarity_rank_1 <- k_fold_d_rank_param_est$dissimilarity_rank

param_est <- k_fold_d_rank_param_est$param_estimations

train_test_std_split_2 <- train_test_std_split(data, 
                                               train_size_2, 
                                               test_size = Inf,
                                               feature_selection)

dissimilarity_rank_2 <- build_dissimilarity_rank(n_users, 
                                                 train_test_std_split_2$train_set, 
                                                 train_test_std_split_2$train_set_std, 
                                                 train_test_std_split_2$test_set)

Solution

  • I believe that the option you are missing is the scheduling option for furrr. By default your data is split up into as many chunks as you have workers specified at the beginning of the future_map call and then each worker gets assigned one chunk to work on. Once a worker is done with it's chunk, it will look for another chunk and start working on that. If there are no more chunks left, the worker will go idle.

    You can specify with the scheduling option into how many chunks your data should be split up per worker. For example .options = furrr_options(scheduling = 2) will create two chunks per worker and workers that finish early will start working on another chunk.

    For more information here is a vignette on chunking https://davisvaughan.github.io/furrr/articles/articles/chunking.html

    PS: You have some nested future calls in your code, depending on your specified future::plan() this will only slow down the code