Search code examples
rdatabaseapache-sparkcassandrasparklyr

Convert wide table to long format in Cassandra


I've unfortunately been given a very messy and very large table (csv) by a client. It's in wide format :'(

As an example the columns are:

Name, Date, Usage_Hr1, Usage_Hr2, ..., Usage_Hr24, ... lots more columns

I would normally just load the .csv into R and use gather from the tidyr package, but the data is too big. I've considered loading the data into sparklyr, but there is no gather function in sparklyr yet...

So my question is, once I've COPYed my table into Cassandra (set the PRIMARY KEY to Name and Date), how can I convert these cols to long format? Am I just out of luck? I'm not a DB guy btw so I have no idea here.

Note I'm using the latest version of Cassandra and my current table is about 10 million rows.


Solution

  • In Spark you can use explode functions, but compared to support APIs, doing this sparklyr is a bit involved.

    Initialization and example data:

    library(stringi)
    
    sc <- spark_connect("local[*]")
    df <- data.frame(A = c("a", "b", "c"), B = c(1, 3, 5), C = c(2, 4, 6))
    sdf <- copy_to(sc, df, overwrite =TRUE)
    

    Helper functions:

    #' Given name, return corresponding SQL function
    sqlf <- function(f) function(x, ...) {
      invoke_static(sc, "org.apache.spark.sql.functions", f, x, ...)
    }
    

    Melt function:

    #' @param df tbl_spark
    #' @param sc spark_connection
    #' @param id_vars id columns
    #'
    melt <- function(df, sc, id_vars, value_vars = NULL, 
        var_name = "key", value_name = "value") {
      # Alias for the output view
      alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep = "_")
      # Get session and JVM object
      spark <- spark_session(sc)
      jdf <- spark_dataframe(df)
    
      # Convert characters to JVM Columns
      j_id_vars <- lapply(id_vars, sqlf("col"))
    
      # Combine columns into array<struct<key,value>> and explode
      exploded <- sqlf("explode")(sqlf("array")(lapply(value_vars, function(x) {
        key <- sqlf("lit")(x) %>% invoke("alias", var_name)
        value <- sqlf("col")(x) %>% invoke("alias", value_name)
        sqlf("struct")(list(key, value))
      })))
    
      # expand struct<..., struct<key, value>> into struct<..., key, value>
      exprs <- lapply(
        c(id_vars, paste("col", c(var_name, value_name), sep = ".")),
        sqlf("col"))
    
      # Explode and register as temp table
      jdf %>% 
        invoke("withColumn", "col", exploded) %>% 
        invoke("select", exprs) %>% 
        invoke("createOrReplaceTempView", alias)
    
      dplyr::tbl(sc, alias)
    }
    

    Example usage:

    melt(sdf, sc, "A", c("B", "C"))
    
    ## Source:   query [6 x 3]
    ## Database: spark connection master=local[*] app=sparklyr local=TRUE
    ## 
    ## # A tibble: 6 x 3
    ##       A   key value
    ##   <chr> <chr> <dbl>
    ## 1     a     B     1
    ## 2     a     C     2
    ## 3     b     B     3
    ## 4     b     C     4
    ## 5     c     B     5
    ## 6     c     C     6