Search code examples
scalaapache-sparkcassandraapache-spark-sqlspark-cassandra-connector

Write to Cassandra with writetime using dataframe in spark


I have a following code :-

  val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
  val collection = kafkaStream.map(_._2).map(parser)
    collection.foreachRDD(rdd =>
      {
        if (!rdd.partitions.isEmpty) {
          try {
            val dfs = rdd.toDF() 
dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "tablename", "keyspace" -> "dbname"))
              .mode(SaveMode.Append).save()
          } catch {
            case e: Exception => e.printStackTrace
          }
        } else {
          println("blank rdd")
        }
      })

In above example I'm saving spark streaming to cassandra using dataframe. Now, I want each row of df should have its specific writetime, similar to this command -

insert into table (imei , date , gpsdt ) VALUES ( '1345','2010-10-12','2010-10-12 10:10:10') USING TIMESTAMP 1530313803922977;

So basically writetime of each row should be equal to the gpsdt column of that row. On searching I found this link but it shows example of RDD, i want similar use case of dataframe - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Any suggestions, Thanks


Solution

  • As I'm aware, there is no such functionality in DataFrame version (there is corresponding JIRA: https://datastax-oss.atlassian.net/browse/SPARKC-416). But you anyway have the RDD, that you convert into DataFrame - why not use saveToCassandra as described in link that you cited?

    P.S. you may have performance problems as you check for emptiness (http://www.waitingforcode.com/apache-spark/isEmpty-trap-spark/read)