Search code examples
rapache-sparkcassandrasparkrsparklyr

Serialize SparkR DataFrame to jobj


I'd like to be able to use the Java methods on a SparkR SparkDataFrame to write data to Cassandra.

Using the sparklyr extensions for example, I can do something like this:

sparklyr::invoke(sparklyr::spark_dataframe(spark_tbl), "write") %>>% 
sparklyr::invoke("format", "org.apache.spark.sql.cassandra") %>>% 
sparklyr::invoke("option", "keyspace", keyspace) %>>% 
sparklyr::invoke("option", "table", table) %>>%
sparklyr::invoke("mode", "append") %>% 
sparklyr::invoke("save")

which can achieve a write speed of around 20k rows a second.

For my use case, however, I'd like to be able to use SparkR::spark.lapply so I can collect subsets of my Cassandra table locally, run a script on them and write the data back. Every method I've tried using sparklyr has ended up single threaded, and so not utilising spark at all really.

With SparkR, I can write the data using something like:

SparkR::saveDF(SparkR::as.DataFrame(dt_local), "",
               source = "org.apache.spark.sql.cassandra",
               table = table,
               keyspace = keyspace,
               mode = "append")

however the write speed is closer to 2k rows per second in this case. I think I would be able to use SparkR::sparkR.callJMethod to call the same chain as in the sparklyr case to achieve the higher write speed, however I'd first need to serialize the SparkDataFrame such that have a handle to a jobj which I haven't been able to do. Is this possible?

I'm also open to any other methods of achieving this if possible. I've investigated trying to move between sparkR and sparklyr but it seems the backends are too different (as far as I know). I also believe from here that there's no analogous lapply for sparklyr as of yet.

Thanks for any help


Solution

  • Long story short it is not possible. Apache Spark doesn't support, and most likely will never support, nested parallelized operations. This is not related to a particular backend. You could try to use SparkR::*apply methods with native R clients (dbConnect, RCassandra).

    You can access JVM object:

    SparkR::as.DataFrame(dt_local)@sdf
    

    but it simply cannot be used outside the driver node.