I'm trying to insert Kafka-stream Json data into my Cassandra using Scala, but unfortunately getting stuck. My Code is :-
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val records = kafkaStream.map(_._2)
val collection = records.flatMap(_.split(",")).map(s => event(s(0).toString, s(1).toString))
case class event(vehicleid: String, vehicletype: String)
collection.foreachRDD(x => println(x))
collection.saveToCassandra("traffickeyspace", "test", SomeColumns("vehicleid", "vehicletype"))
The Error I'm getting is :-
not enough arguments for method saveToCassandra: (implicit connector: com.datastax.spark.connector.cql.CassandraConnector, implicit rwf: com.datastax.spark.connector.writer.RowWriterFactory[event])Unit. Unspecified value parameter rwf. kafkatesting.scala /SparkRedis/src/com/spark/test line 48 Scala Problem
and other error is :-
could not find implicit value for parameter rwf: com.datastax.spark.connector.writer.RowWriterFactory[event] kafkatesting.scala /SparkRedis/src/com/spark/test line 48 Scala Problem
My JSON record from producer is :-
{"vehicleId":"3a92516d-58a7-478e-9cff-baafd98764a3","vehicleType":"Small Truck","routeId":"Route-37","longitude":"-95.30818","latitude":"33.265877","timestamp":"2018-03-28 06:21:47","speed":58.0,"fuelLevel":25.0}
You actually cannot declare your case class where you have. Case classes have to be defined at the top level scope to get the TypeTag
they need. Look here for more details: Scala - No TypeTag Available Exception when using case class to try to get TypeTag?
So move your case class to the top level scope of the file you are in. This way it gets it TypeTag
, allowing it to get its ColumnMapper
which allows it to pick up its implicit RowWriterFactor