I've unfortunately been given a very messy and very large table (csv) by a client. It's in wide format :'(
As an example the columns are:
Name, Date, Usage_Hr1, Usage_Hr2, ..., Usage_Hr24, ... lots more columns
I would normally just load the .csv into R
and use gather
from the tidyr
package, but the data is too big. I've considered loading the data into sparklyr
, but there is no gather
function in sparklyr
yet...
So my question is, once I've COPY
ed my table into Cassandra (set the PRIMARY KEY
to Name and Date), how can I convert these cols to long format? Am I just out of luck? I'm not a DB guy btw so I have no idea here.
Note I'm using the latest version of Cassandra and my current table is about 10 million rows.
In Spark you can use explode
functions, but compared to support APIs, doing this sparklyr
is a bit involved.
Initialization and example data:
library(stringi)
sc <- spark_connect("local[*]")
df <- data.frame(A = c("a", "b", "c"), B = c(1, 3, 5), C = c(2, 4, 6))
sdf <- copy_to(sc, df, overwrite =TRUE)
Helper functions:
#' Given name, return corresponding SQL function
sqlf <- function(f) function(x, ...) {
invoke_static(sc, "org.apache.spark.sql.functions", f, x, ...)
}
Melt function:
#' @param df tbl_spark
#' @param sc spark_connection
#' @param id_vars id columns
#'
melt <- function(df, sc, id_vars, value_vars = NULL,
var_name = "key", value_name = "value") {
# Alias for the output view
alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep = "_")
# Get session and JVM object
spark <- spark_session(sc)
jdf <- spark_dataframe(df)
# Convert characters to JVM Columns
j_id_vars <- lapply(id_vars, sqlf("col"))
# Combine columns into array<struct<key,value>> and explode
exploded <- sqlf("explode")(sqlf("array")(lapply(value_vars, function(x) {
key <- sqlf("lit")(x) %>% invoke("alias", var_name)
value <- sqlf("col")(x) %>% invoke("alias", value_name)
sqlf("struct")(list(key, value))
})))
# expand struct<..., struct<key, value>> into struct<..., key, value>
exprs <- lapply(
c(id_vars, paste("col", c(var_name, value_name), sep = ".")),
sqlf("col"))
# Explode and register as temp table
jdf %>%
invoke("withColumn", "col", exploded) %>%
invoke("select", exprs) %>%
invoke("createOrReplaceTempView", alias)
dplyr::tbl(sc, alias)
}
Example usage:
melt(sdf, sc, "A", c("B", "C"))
## Source: query [6 x 3]
## Database: spark connection master=local[*] app=sparklyr local=TRUE
##
## # A tibble: 6 x 3
## A key value
## <chr> <chr> <dbl>
## 1 a B 1
## 2 a C 2
## 3 b B 3
## 4 b C 4
## 5 c B 5
## 6 c C 6