Search code examples
scalaapache-sparkcassandraspark-cassandra-connector

Inserting Data Into Cassandra table Using Spark DataFrame


I'm using Scala Version 2.10.5 Cassandra 3.0 and Spark 1.6. I want to insert data into cassandra so I tried Out basic Example

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

Which Works and able insert data into Cassandra.So I had a csv file Which I wan to insert into Cassandra table by matching schema

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema =  StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
 personSchemaRDD.saveToCassandra

When I am using SaveToCassndra Iam getting saveToCassandra is not part of personSchemaRDD. So taught of trying in different way

 df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

But getting the cannot connect to cassandra on ip:port.can any one tell me the best way to do it. I need to periodically save data to cassandra from the files.


Solution

  • sqlContext.applySchema(...) returns a DataFrame and a DataFrame does not have the saveToCassandra method.

    You could the .write method with it:

    val personDF = sqlContext.applySchema(rowRDD, schema)
    personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
    

    If we want to use the savetoCassandra method, the best way is to have a schema-aware RDD, using a case class.

    case class Person(firstname:String, lastName:String, age:Int)
    val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
    rowRDD.saveToCassandra(keyspace, table)
    

    The Dataframe write method should work. Check that you have configured your context correctly.