Search code examples
apache-sparkcassandrarddspark-cassandra-connector

Scala joinWithCassandraTable result (or CassandraTableScanRDD) to Dataset


I'm using the Datastax spark-cassandra-connector to access some data in Cassandra.

To be able to efficiently access all the data I need to my query, I have to use the joinWithCassandraTable method to get data from a bunch of partitions back. This gives me an object of class com.datastax.spark.connector.rdd.CassandraTableScanRDD (or similar, to test I'm actually just using the standard sc.cassandraTable(ks, tbl) method to read data).

The problem is, all the methods I need to use on the resulting object need an object of class org.apache.spark.sql.Dataset.

I have done a lot of searching around and haven't been able to find anything to help - the closest I've found is this similar question, which I don't think has been sufficiently answered, as it ignores the use case where the recommended method of accessing all the necessary data is to use joinWithCassandraTable.

I'm also new to java and scala, so sorry if I'm a little slow. Any help would be massively appreciated as I'm pretty stuck at this point.

Thanks, Akhil


Solution

  • What you can do is read your RDD into an RDD[Row] and then change that into a DataFrame. Our only issue is we also need the Schema. So lets do this in two steps.

    First lets get the schema programmatically from our join target

    val schema = spark.read.cassandraFormat("dogabase", "test").load.schema
    
    /**
    schema: org.apache.spark.sql.types.StructType = 
    StructType(StructField(owner,StringType,true), 
    StructField(dog_id,IntegerType,true), 
    StructField(dog_age,IntegerType,true), 
    StructField(dog_name,StringType,true))
    **/
    

    Then we can make org.apache.spark.sql.Row objects out of our Cassandra Driver rows.

    import org.apache.spark.sql.Row
    val joinResult = 
      sc.parallelize(Seq(Tuple1("Russ")))
        .joinWithCassandraTable("test", "dogabase")
        .map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row
    

    Now that we have a schema and an RDD[Row] we can use the createDataFrame method of the spark session

    val dataset = spark.createDataFrame(joinResult, schema)
    dataset.show
    
    /**
    +-----+------+-------+--------+
    |owner|dog_id|dog_age|dog_name|
    +-----+------+-------+--------+
    | Russ|     1|     10|    cara|
    | Russ|     2|     11|sundance|
    +-----+------+-------+--------+
    **/
    

    And just incase you don't believe me that a DataFrame is a Dataset

    dataset.getClass
    Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset
    

    EDIT: Possible needed converters

    Some Cassandra types are not valid basis for Spark Rows so you may need to convert them. This could be done by writing a quick conversion function. Unfortunately the built in conversion that the SCC uses makes an internal representation so we can't use those conversions.

    def convertToSpark(element:Any): Any = {
      case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date
      case other => other
    }
    

    Then when making your rows

    cassandraRow.columnValues.map(convertToSpark):_*