I am trying to read data from Kafka and Storing into Cassandra tables through Spark RDD's.
Getting error while compiling the code:
/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)
[error] val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
Below code : when i run the code manually through interactive spark-shell
it works fine, but while compiling code for spark-submit
error comes.
// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)
// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long"))
returns a tuple of key and value (since messages in Kafka are optionally keyed). In your case it's of type (String, String)
. If you want to split the value, you have to first take it out:
val lines =
.map(line => line._2.split(','))
.map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
Or using partial function syntax:
val lines =
.map { case (_, value) => value.split(',') }
.map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))