Search code examples
rapache-sparkdplyrapache-spark-sqlsparklyr

Sparklyr/Dplyr - How to apply a user defined function for each row of a sparkdata frame and create write the output of each row to new column?


I have a spark_tbl containing 160+ columns.

I will give an example to show how the dataframe looks:

Key  A  B  C  D  E  F  G .....Z

s1   0  1  0  1  1  0  1      0
s2   1  0  0  0  0  0  0      0
s3   1  1  0  0  0  0  0      0
s4   0  1  0  1  1  0  0      0

What I want to achieve is to create a new column based on the values is each column like,

Key  A  B  C  D  E  F  G .....Z  panel

s1   0  1  0  1  1  0  1      0  B,D,E,G
s2   1  0  0  0  0  0  0      0  A 
s3   1  1  0  0  0  0  0      0  A,B
s4   0  1  0  1  1  0  0      0  B,D,E

Check each column rowwise and add the column name to the string if the value is 1 and finally write it to a column called panel.

My attempt at writing a user defined function:

get_panel <- function(eachrow){
 id <- ""
 row_list <- as.list(eachrow)
 for (i in 1:length(row_list)){
  if(row_list[i] == "1"){
   if(id == ""){
     id = columns[i+1]
   }else{
     id = paste(id, ",", columns[i+1])
   }
  }
 }
return(id)
}

This works with regular dataframe using apply function. But,

How to apply this function to Spark Dataframe or tbl_spark?


Solution

  • I think that @JasonAizkalns is on the right track. Starting with his example:

    library(dplyr)
    library(sparklyr)
    sc <- spark_connect(master = "local")
    
    
    mat <- matrix(c(paste0("s", 1:4), as.numeric(sample(0:1, 4 * 26, TRUE))), ncol = 27)
    colnames(mat) <- c("Key", LETTERS[1:26])
    
    df <- data.frame(mat, stringsAsFactors = FALSE) %>%
      mutate_at(vars(-"Key"), as.numeric) %>%
      as_data_frame()
    df
    
    dfs <- copy_to(sc, df, overwrite = TRUE)
    

    We can get there using a little rlang magic.

    dfs <- dfs %>% mutate(panel = "")
    for (letter in LETTERS[1:26]) {
      dfs <- dfs %>% mutate(panel = concat_ws(",", panel, ifelse(!!sym(letter) == 1.0, yes = letter, no = NA)))
    }
    
    dfs %>% 
      mutate(panel = regexp_replace(panel, "^,", "")) %>% # remove leading comma
      select(Key, A:D, panel)
    

    Gives what I think you want

    # Source: spark<?> [?? x 6]
      Key       A     B     C     D panel                           
    * <chr> <dbl> <dbl> <dbl> <dbl> <chr>                           
    1 s1        0     0     1     1 C,D,E,G,O,P,Q,U,Z              
    2 s2        1     0     0     1 A,D,G,K,L,M,N,Q,S,U,W          
    3 s3        0     1     0     0 B,E,L,M,O,Q,R,S,T,Y            
    4 s4        1     1     0     1 A,B,D,E,G,I,J,M,N,R,S,T,U,V,Y,Z
    

    The key here is the concat_ws Spark SQL (not R) function. See https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#concat_ws-java.lang.String-org.apache.spark.sql.Column...-