Search code examples
scalacassandradatastax-enterprisespark-cassandra-connector

Datastax spark cassandra connector - writing DF to cassandra table


We have recently started big-data project using Scala, Spark and Cassandra and I am new to all of these technologies. I am trying to do simple task write to and read from cassandra table. I am able to achieve this if I keep property names and column names all either in lowercase or snake case (unserscores), but I want to use camel case in my scala code. Is there any better way to achieve this using camel case format in Scala and snake case in cassandra.

we are using

scala - 2.10.5 spark - 1.6.2 datastax spark-cassandra-connector - 1.6.0 cassandra - 3.0.9.1346 datastax enterprise - 5.0.3

Cassandra table

CREATE TABLE dev.castable (
id int PRIMARY KEY,
long_name text,
name text,
short_name text)

Scala code

    val conf = new SparkConf()
        .setAppName("TestHelper")
        .setMaster("local")
        .set("spark.cassandra.connection.host","127.0.01")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    println("writing data to cassandra")
    val df = sqlContext.createDataFrame(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
    df.write //*** this is not working
      .cassandraFormat("castable", "dev")
      .mode(SaveMode.Append)
      .save()

    println("reading data from cassandra") //*** This is working fine
    val rdd = sc.cassandraTable[MyRow]("dev", "castable")
    rdd.foreach(println)

Exception

Exception in thread "main" java.util.NoSuchElementException: Columns not found in table dev.castable: longName, shortName
at com.datastax.spark.connector.SomeColumns.selectFrom(ColumnSelector.scala:38)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:268)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:67)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:85)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at com.aktana.spark.util.LocalTestDriver$.main(LocalTestDriver.scala:38)

I read that spark-cassandra-connector automatically does this conversion automatically but it is not working for me. datastax spark-cassandra-connector


Solution

  • Using RDDs, spark-cassandra-connector automatically converts camel cased properties to underscored column names. Thanks again RussS

    Here is how I am saving case class objects to cassandra table

        val writeRDD = sc.makeRDD(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
        writeRDD.saveToCassandra("dev", "castable")