Search code examples
rapache-sparkcassandracqlsparklyr

Importing cassandra table into spark via sparklyr - possible to select only some columns?


I've been working with sparklyr to bring large cassandra tables into spark, register these with R and conduct dplyr operations on them.

I have been successfully importing cassandra tables with the code that looks like this:

# import cassandra table into spark

cass_df <- sparklyr:::spark_data_read_generic(
  sc, "org.apache.spark.sql.cassandra", "format", 
  list(keyspace = "cass_keyspace", table = "cass_table")
  ) %>% 
  invoke("load")


# register table in R

cass_tbl <- sparklyr:::spark_partition_register_df(
         sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
       )

Some of these cassandra tables are pretty large ( > 8.5bn rows) and take a while to import/register, and some lead to memory overruns, even with 6 nodes running a total of 60 cores and 192gb RAM. However, I only typically need a few of the columns from each cassandra database.

My questions are:

  1. Is it possible to filter the cassandra database on import/registration so that it only imports some columns or so that it is filtered on the primary key (i.e. by passing SQL / CQL type queries such as SELECT name FROM cass_table WHERE id = 5)?
  2. Where would such a query go in the above code, and what form does the syntax take?

I have tried adding such a query as an additional option in the options list, i.e.:

list(. . . , select = "id")

as well as invoking it as a separate pipe before %>% invoke("load"), i.e.:

invoke("option", "select", "id") %>%

# OR

invoke("option", "query", s"select id from cass_table") %>%

But these do not work. Any suggestions?


Solution

  • You can skip eager cache and select columns of interest:

    session <- spark_session(sc)
    
    # Some columns to select
    cols <- list("x", "y", "z")
    
    cass_df <- session %>% 
      invoke("read") %>% 
      invoke("format", "org.apache.spark.sql.cassandra") %>% 
      invoke("options", as.environment(list(keyspace="test"))) %>% 
      invoke("load") %>% 
      # We use select(col: String, cols* String) so the first column
      # has to be used separately. If you want only one column the third argument
      # has to be an empty list 
      invoke("select", cols[[1]], cols[2:length(cols)]) %>%
      # Standard lazy cache if you need one
      invoke("cache")
    

    If you use predicates which can significantly reduce amount of fetched data set pushdown option to "true" (default) and use filter before caching.

    If you want to pass more complex query you register temporary view and sql method:

    session %>%
      invoke("read") %>% 
      ...
      invoke("load") %>% 
      invoke("createOrReplaceTempView", "some_name")
    
    cass_df <- session %>% 
      invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
      invoke("cache")