Search code examples
rapache-sparkdplyrsparkrsparklyr

How to get column values (value.var in dcast) after pivoting using sdf_pivot() function


I am trying to dcast my spark dataframe using sdf_pivot() function. I want to
display values of columns like value.var parameter in dcast() from reshape2 package. Please look at the example below.

id <- c(1,1,1,1,1,2,2,2,3,3,3)
name <- c("A","B","C","D","E","A","B","C","D","E","F")
value <- c(1,2,3,1,1,2,3,1,1,2,3)
dt <- data.frame(id,name,value)
reshape2::dcast(dt,id~name,value.var = "value")

output1-

  id  A  B  C  D  E  F
1  1  1  2  3  1  1 NA
2  2  2  3  1 NA NA NA
3  3 NA NA NA  1  2  3

spark_dt <- copy_to(sc, dt)
sdf_pivot(spark_dt,id~name)

output2-

id     A     B     C     D     E     F
  <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1     1     1     1     1     1     1   NaN
2     3   NaN   NaN   NaN     1     1     1
3     2     1     1     1   NaN   NaN   NaN

It seems we don't have value.var parameter in sdf_pivot() function. I am new to spark and any suggestions would be much appreciated. Do I need to write custom function to do it?

Note**- I tried

##Pivoting
cohort_paste <- function(gdf) {
  expr <- invoke_static(
    sc,
    "org.apache.spark.sql.functions",
    "paste",
    "value"
  )
  gdf %>% invoke("agg", expr, list())
}

It is giving error

Error: java.lang.IllegalArgumentException: invalid method paste for object org.apache.spark.sql.functions

I actually want to use paste function.

Tried with Numeric values column
df <- tibble(
    id = c(rep(1, 9), rep(2, 9)),
    name = rep(rep(c("A", "B", "C"), each=3), 2),
    value = sample(10,18,replace=T)
)[sample(1:18, size=10), ]

spark_dt <- copy_to(sc, df, overwrite=TRUE)

collect_list <- function(gdf) {
    expr <- invoke_static(
        sc,
        "org.apache.spark.sql.functions",
        "collect_list",
        "value"
    )
    gdf %>% invoke("agg", expr, list())
}

sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>% 
    mutate_at(vars(-id), funs(concat_ws(" ", .)))

Error log-

Error: org.apache.spark.sql.AnalysisException: cannot resolve 'concat_ws(' ', sparklyr_tmp_79e15abf584.A)' due to data type mismatch: argument 2 requires (array or string) type, however, 'sparklyr_tmp_79e15abf584.A' is of array type.; line 1 pos 13; 'GlobalLimit 10 +- 'LocalLimit 10 +- 'Project [id#3038, concat_ws( , A#3156) AS A#3172, concat_ws( , B#3158) AS B#3173, concat_ws( , C#3160) AS C#3174] +- SubqueryAlias sparklyr_tmp_79e15abf584 +- Aggregate [id#3038], [id#3038, collect_list(if ((name#3039 = A)) value#3040 else cast(null as int), 0, 0) AS A#3156, collect_list(if ((name#3039 = B)) value#3040 else cast(null as int), 0, 0) AS B#3158, collect_list(if ((name#3039 = C)) value#3040 else cast(null as int), 0, 0) AS C#3160] +- Project [id#3038, name#3039, value#3040] +- SubqueryAlias df +- Relation[id#3038,name#3039,value#3040] csv


Solution

  • This failed, because paste is not a Spark function and you cannot execute R code in this context.

    You can try something like this:

    library(dplyr)
    library(sparklyr)
    
    sc <- spark_connect("local[8]")
    set.seed(1)
    
    df <- tibble(
      id = c(rep(1, 9), rep(2, 9)),
      name = rep(rep(c("A", "B", "C"), each=3), 2),
      value = sample(letters, size=18)
    )[sample(1:18, size=10), ]
    
    spark_dt <- copy_to(sc, df, overwrite=TRUE)
    
    collect_list <- function(gdf) {
      expr <- invoke_static(
        sc,
        "org.apache.spark.sql.functions",
        "collect_list",
        "value"
      )
      gdf %>% invoke("agg", expr, list())
    }
    
    sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>% 
      mutate_at(vars(-id), funs(concat_ws(" ", .)))
    
    #  # Source:   lazy query [?? x 4]
    #  # Database: spark_connection
    #       id A     B     C    
    #    <dbl> <chr> <chr> <chr>
    #  1  1.00 j g   u e   w    
    #  2  2.00 b c   v x   f  
    

    You can also use window functions:

    first <- function(gdf) {
      expr <- invoke_static(
        sc,
        "org.apache.spark.sql.functions",
        "first",
        "value"
      )
      gdf %>% invoke("agg", expr, list())
    }
    
    
    spark_dt %>% 
      group_by(id, name) %>% 
      arrange(value) %>% 
      mutate(i = row_number()) %>% 
      mutate(name = concat_ws("_", name,  i)) %>% 
      select(-i) %>% sdf_pivot(id ~ name, first)
    
    # # Source:   table<sparklyr_tmp_1ba404d8f51> [?? x 8]
    # # Database: spark_connection
    #      id A_1   A_2   A_3   B_1   B_2   B_3   C_1  
    #   <dbl> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
    # 1  1.00 m     NA    NA    f     n     v     d    
    # 2  2.00 b     x     y     h     r     NA    NA