I've been working with sparklyr
to bring large cassandra tables into spark, register these with R and conduct dplyr
operations on them.
I have been successfully importing cassandra tables with the code that looks like this:
# import cassandra table into spark
cass_df <- sparklyr:::spark_data_read_generic(
sc, "org.apache.spark.sql.cassandra", "format",
list(keyspace = "cass_keyspace", table = "cass_table")
) %>%
invoke("load")
# register table in R
cass_tbl <- sparklyr:::spark_partition_register_df(
sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
)
Some of these cassandra tables are pretty large ( > 8.5bn rows) and take a while to import/register, and some lead to memory overruns, even with 6 nodes running a total of 60 cores and 192gb RAM. However, I only typically need a few of the columns from each cassandra database.
My questions are:
SQL
/ CQL
type queries such as SELECT name FROM cass_table WHERE id = 5
)?I have tried adding such a query as an additional option in the options list, i.e.:
list(. . . , select = "id")
as well as invoking it as a separate pipe before %>% invoke("load")
, i.e.:
invoke("option", "select", "id") %>%
# OR
invoke("option", "query", s"select id from cass_table") %>%
But these do not work. Any suggestions?
You can skip eager cache and select columns of interest:
session <- spark_session(sc)
# Some columns to select
cols <- list("x", "y", "z")
cass_df <- session %>%
invoke("read") %>%
invoke("format", "org.apache.spark.sql.cassandra") %>%
invoke("options", as.environment(list(keyspace="test"))) %>%
invoke("load") %>%
# We use select(col: String, cols* String) so the first column
# has to be used separately. If you want only one column the third argument
# has to be an empty list
invoke("select", cols[[1]], cols[2:length(cols)]) %>%
# Standard lazy cache if you need one
invoke("cache")
If you use predicates which can significantly reduce amount of fetched data set pushdown
option to "true"
(default) and use filter
before caching.
If you want to pass more complex query you register temporary view and sql
method:
session %>%
invoke("read") %>%
...
invoke("load") %>%
invoke("createOrReplaceTempView", "some_name")
cass_df <- session %>%
invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
invoke("cache")