I have a spark_tbl containing 160+ columns.
I will give an example to show how the dataframe looks:
Key A B C D E F G .....Z
s1 0 1 0 1 1 0 1 0
s2 1 0 0 0 0 0 0 0
s3 1 1 0 0 0 0 0 0
s4 0 1 0 1 1 0 0 0
What I want to achieve is to create a new column based on the values is each column like,
Key A B C D E F G .....Z panel
s1 0 1 0 1 1 0 1 0 B,D,E,G
s2 1 0 0 0 0 0 0 0 A
s3 1 1 0 0 0 0 0 0 A,B
s4 0 1 0 1 1 0 0 0 B,D,E
Check each column rowwise and add the column name to the string if the value is 1 and finally write it to a column called panel.
My attempt at writing a user defined function:
get_panel <- function(eachrow){
id <- ""
row_list <- as.list(eachrow)
for (i in 1:length(row_list)){
if(row_list[i] == "1"){
if(id == ""){
id = columns[i+1]
}else{
id = paste(id, ",", columns[i+1])
}
}
}
return(id)
}
This works with regular dataframe using apply function. But,
How to apply this function to Spark Dataframe or tbl_spark?
I think that @JasonAizkalns is on the right track. Starting with his example:
library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")
mat <- matrix(c(paste0("s", 1:4), as.numeric(sample(0:1, 4 * 26, TRUE))), ncol = 27)
colnames(mat) <- c("Key", LETTERS[1:26])
df <- data.frame(mat, stringsAsFactors = FALSE) %>%
mutate_at(vars(-"Key"), as.numeric) %>%
as_data_frame()
df
dfs <- copy_to(sc, df, overwrite = TRUE)
We can get there using a little rlang
magic.
dfs <- dfs %>% mutate(panel = "")
for (letter in LETTERS[1:26]) {
dfs <- dfs %>% mutate(panel = concat_ws(",", panel, ifelse(!!sym(letter) == 1.0, yes = letter, no = NA)))
}
dfs %>%
mutate(panel = regexp_replace(panel, "^,", "")) %>% # remove leading comma
select(Key, A:D, panel)
Gives what I think you want
# Source: spark<?> [?? x 6]
Key A B C D panel
* <chr> <dbl> <dbl> <dbl> <dbl> <chr>
1 s1 0 0 1 1 C,D,E,G,O,P,Q,U,Z
2 s2 1 0 0 1 A,D,G,K,L,M,N,Q,S,U,W
3 s3 0 1 0 0 B,E,L,M,O,Q,R,S,T,Y
4 s4 1 1 0 1 A,B,D,E,G,I,J,M,N,R,S,T,U,V,Y,Z
The key here is the concat_ws
Spark SQL (not R) function. See https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#concat_ws-java.lang.String-org.apache.spark.sql.Column...-