I'm using Spark Streaming with Scala and I'm getting json records from kafka. I would like to parse it so I can get the values (dateTime and quality) and process.
Here is my code :
stream.foreachRDD(rdd => {
rdd.collect().foreach(i =>
println(msgParse(i.value()).quality)
)
})
And I have this case class and my parse function :
case class diskQuality(datetime: String , quality : Double) extends Serializable
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
I've added this dependency :
libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"
The records I'm receiving has this format :
"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
However I get this error :
Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"
EDIT :
When I try to parse the following using the same function it works. But even if kafka messages come in the same format, it still gives the same error :
val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
I'm using scalaVersion := "2.10.6" and json4s-native_2.10"
Any help would be really appreciated. Thank you for you time
Looks like you have a problem on your Kafka Producer side, you have to end up with the following format by replacing escaped quotes:
{"datetime":"14-05-2017 14:18:30","quality":92.6}
It will give you correctly formatted JSON string.