Search code examples
jsonscalaapache-sparkrdd

Reading JSON RDD using Spark Scala


I am receiving JSON data from Kafka brokers and I am reading it using Spark Streaming and Scala. Following is the example data:

{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}

I receive this data as RDD[String] in my Scala code , now I want to read particular key from each data row, for example 'version' from the above data. I am able to do this as follows:

for(record <- rdd){
  val jsonRecord = JSON.parseFull(record );
  val globalMap = jsonRecord.get.asInstanceOf[Map[String, Any]]
  val version = globalMap.get("version").get.asInstanceOf[String]
}

But I am not sure if this is the best way to read RDD having JSON data. Please suggest.

Thanks,


Solution

  • Use json4s library to parse json data & It will be available with spark default no need to import extra libraries.

    Check below code.

    scala> rdd.collect.foreach(println)
    
    {"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}
    
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    rdd.map{ row =>
    
        // Import required libraries for json parsers.
        import org.json4s._
        import org.json4s.jackson.JsonMethods._
        implicit val formats = DefaultFormats
    
        // parse json message using parse function from json4s lib.
    
        val jsonData = parse(row)
    
        // extract required fields from parsed json data.
    
        // extracting version field value
        val version = (jsonData \\ "version").extract[Int] 
    
        // extracting timestamp field value
        val timestamp = (jsonData \\ "timestamp").extract[String] 
    
        (version,timestamp)
    }
    .collect
    .foreach(println)
    
    
    // Exiting paste mode, now interpreting.
    
    (2,2020-12-11 22:35:00.000000 UTC)