Search code examples
scalaapache-sparkapache-kafkaspark-streamingspark-submit

Value split is not a member of (String, String)


I am trying to read data from Kafka and Storing into Cassandra tables through Spark RDD's.

Getting error while compiling the code:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)

[error]     val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error]                                               ^
[error] one error found

[error] (compile:compileIncremental) Compilation failed

Below code : when i run the code manually through interactive spark-shell it works fine, but while compiling code for spark-submit error comes.

// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)

// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 

Solution

  • KafkaUtils.createDirectStream returns a tuple of key and value (since messages in Kafka are optionally keyed). In your case it's of type (String, String). If you want to split the value, you have to first take it out:

    val lines = 
      messages
       .map(line => line._2.split(','))
       .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
    

    Or using partial function syntax:

    val lines = 
      messages
       .map { case (_, value) => value.split(',') }
       .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))