Search code examples
jsonscalaapache-kafkaspark-streamingjson4s

Scala Parsing a json record coming from kafka


I'm using Spark Streaming with Scala and I'm getting json records from kafka. I would like to parse it so I can get the values (dateTime and quality) and process.

Here is my code :

stream.foreachRDD(rdd => {
  rdd.collect().foreach(i =>
    println(msgParse(i.value()).quality)
  )
})

And I have this case class and my parse function :

case class diskQuality(datetime: String , quality : Double) extends  Serializable

def msgParse(value: String): diskQuality = {

  import org.json4s._
  import org.json4s.native.JsonMethods._

  implicit val formats = DefaultFormats

  val res = parse(value).extract[diskQuality]
  return res

}

I've added this dependency :

libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"

The records I'm receiving has this format :

"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"

However I get this error :

Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"

EDIT :

When I try to parse the following using the same function it works. But even if kafka messages come in the same format, it still gives the same error :

val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"

I'm using scalaVersion := "2.10.6" and json4s-native_2.10"

Any help would be really appreciated. Thank you for you time


Solution

  • Looks like you have a problem on your Kafka Producer side, you have to end up with the following format by replacing escaped quotes:

    {"datetime":"14-05-2017 14:18:30","quality":92.6}

    It will give you correctly formatted JSON string.