Search code examples
rdatabricksazure-databrickssparkr

How to use an undefined list of variables as column names in a SparkDataFrame with SparkR?


I keep progressing in the world of SparkR, and I am now facing an issue which I am unable to resolve.

Working on SparkDataFrame manipulations, I may want to update some columns, or aggregate others. I learned how to do it on a case-by-case basis, i.e. column by column.

Let's take an example :

library(SparkR)
library(magrittr)

# Creating SDF
nb.row <- 10 
nb.col <- 10 
m <- matrix(runif(n=nb.row*nb.col, min = 0, max = 100), nb.row, nb.col)
sdf <- createDataFrame(data.frame(ID = 1:10, CODE = base::sample(LETTERS[1:2]), V = m))
  1. If I want to update columns I can do something like :
sdf <- withColumn(sdf, "V_1", sdf$V_1 * 1000)
sdf <- withColumn(sdf, "V_2", sdf$V_2 * 1000)
  1. If I want to aggregate columns I can do something like :
agg1 <- agg(groupBy(sdf, "CODE"), "SV_6" = sum(sdf$V_6), "SV_7" = sum(sdf$V_7))

My question is : how to deal with these cases when I don't know the list of columns I want to work on ? (Easy on R basic, this seems insurmountable to me in SparkR...)

  1. Back on the case of updating. I found something like :
list.var.1 <- paste0("V_", 1:5)
for (i in 1:length(list.var.1)) {
  sdf <- withColumn(sdf, list.var.1[i], sdf[[list.var.1[i]]] * 1000)
}

This gives me the expected result, but is it the simplest script ? Nothing lighter or more "official" ?

  1. Back on the case of aggregation. I found something like :
# Useful functions
DFjoin <- function(left_df, right_df, key = "key", join_type = "left"){
    left_df <- withColumnRenamed(left_df, key, "left_key")
    right_df <- withColumnRenamed(right_df, key, "right_key")
    result <- join(
        left_df, right_df,
        left_df$left_key == right_df$right_key,
        joinType = join_type)
    result <- withColumnRenamed(result, "left_key", key)
    result$right_key <- NULL
    return(result)
}

sum_spark <- function(res, df, gb, col) {
  Cols <- paste0('S', col)
  tmp <- agg(groupBy(df, gb), alias(sum(df[[col]]), Cols))
  result <- DFjoin(res, tmp, "CODE")
}

# First step to create base SDF called res
res <- SparkR::select(sdf, sdf$CODE) %>% SparkR::distinct()

# Updating res in a for loop with join
for (i in 1:length(list.var.2)){
  res <- sum_spark(res, sdf, "CODE", list.var.2[i])
}

This also gives me the expected result, but the script really seems heavy (according to me, versus R basic). Am I wrong ?

I can't find any more information on this. So everything helps !!


Solution

  • You may refer to this answer on how to use lapply in conjunction with other SparkR functions to get what you desire instead of using for loops.

    Sharing one useful function for using SparkR::agg on a list of columns below that will serve your purpose:

    #' Apply SparkR aggregate function on list of columns
    #'
    #' This function acts as a boilerplate for simplifying the code to do
    #' aggregation on multiple columns as a list and apply Spark::agg function on
    #' that.
    #'
    #' @param spark_df Spark dataframe (Grouped or ususal) on which some SparkR
    #'     aggregate function to be applied
    #' @param agg_cols_list List of Spark column object having some aggregate
    #'     function
    #'
    #' @examples \dontrun{
    #'   # sdf is a SparkR dataframe having numeric columns "a" & "b"
    #'   sdf <- SparkR::createDataFrame(data.frame(a = c(1, 2), b = c(1, 5)))
    #'   sparkr_agg_listargs(sdf,
    #'     lapply(c("a", "b"), function(x) sum(SparkR::column(x)))
    #'   )
    #' }
    sparkr_agg_listargs <- function(spark_df, agg_cols_list) {
      do.call(SparkR::agg, c(spark_df, agg_cols_list))
    }
    

    Please use SparkR::alias effectively to get desired names of new columns.