I'm using Scala Version 2.10.5 Cassandra 3.0 and Spark 1.6. I want to insert data into cassandra so I tried Out basic Example
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
Which Works and able insert data into Cassandra.So I had a csv file Which I wan to insert into Cassandra table by matching schema
val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
When I am using SaveToCassndra Iam getting saveToCassandra is not part of personSchemaRDD. So taught of trying in different way
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
But getting the cannot connect to cassandra on ip:port.can any one tell me the best way to do it. I need to periodically save data to cassandra from the files.
sqlContext.applySchema(...)
returns a DataFrame
and a DataFrame
does not have the saveToCassandra
method.
You could the .write
method with it:
val personDF = sqlContext.applySchema(rowRDD, schema)
personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
If we want to use the savetoCassandra
method, the best way is to have a schema-aware RDD, using a case class.
case class Person(firstname:String, lastName:String, age:Int)
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
rowRDD.saveToCassandra(keyspace, table)
The Dataframe write
method should work. Check that you have configured your context correctly.