Search code examples
scalaapache-sparkcassandraspark-streamingspark-cassandra-connector

spark kafkastream to cassandra in scala


I'm trying to insert Kafka-stream Json data into my Cassandra using Scala, but unfortunately getting stuck. My Code is :-

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val records = kafkaStream.map(_._2)
val collection = records.flatMap(_.split(",")).map(s => event(s(0).toString, s(1).toString))
case class event(vehicleid: String, vehicletype: String)
collection.foreachRDD(x => println(x))
collection.saveToCassandra("traffickeyspace", "test", SomeColumns("vehicleid", "vehicletype"))

The Error I'm getting is :-

not enough arguments for method saveToCassandra: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[event])Unit. Unspecified value parameter rwf. kafkatesting.scala  /SparkRedis/src/com/spark/test  line 48 Scala Problem 

and other error is :-

could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[event]    kafkatesting.scala  /SparkRedis/src/com/spark/test  line 48 Scala Problem

My JSON record from producer is :-

{"vehicleId":"3a92516d-58a7-478e-9cff-baafd98764a3","vehicleType":"Small Truck","routeId":"Route-37","longitude":"-95.30818","latitude":"33.265877","timestamp":"2018-03-28 06:21:47","speed":58.0,"fuelLevel":25.0}

Solution

  • You actually cannot declare your case class where you have. Case classes have to be defined at the top level scope to get the TypeTag they need. Look here for more details: Scala - No TypeTag Available Exception when using case class to try to get TypeTag?

    So move your case class to the top level scope of the file you are in. This way it gets it TypeTag, allowing it to get its ColumnMapper which allows it to pick up its implicit RowWriterFactor