I am receiving JSON data from Kafka brokers and I am reading it using Spark Streaming and Scala. Following is the example data:
{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}
I receive this data as RDD[String] in my Scala code , now I want to read particular key from each data row, for example 'version' from the above data. I am able to do this as follows:
for(record <- rdd){
val jsonRecord = JSON.parseFull(record );
val globalMap = jsonRecord.get.asInstanceOf[Map[String, Any]]
val version = globalMap.get("version").get.asInstanceOf[String]
}
But I am not sure if this is the best way to read RDD having JSON data. Please suggest.
Thanks,
Use json4s
library to parse json data & It will be available with spark default no need to import extra libraries.
Check below code.
scala> rdd.collect.foreach(println)
{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}
scala> :paste
// Entering paste mode (ctrl-D to finish)
rdd.map{ row =>
// Import required libraries for json parsers.
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
// parse json message using parse function from json4s lib.
val jsonData = parse(row)
// extract required fields from parsed json data.
// extracting version field value
val version = (jsonData \\ "version").extract[Int]
// extracting timestamp field value
val timestamp = (jsonData \\ "timestamp").extract[String]
(version,timestamp)
}
.collect
.foreach(println)
// Exiting paste mode, now interpreting.
(2,2020-12-11 22:35:00.000000 UTC)