Search code examples
scalaapache-kafkakafka-consumer-apiapache-flinkflink-streaming

Converting a Datastream containing key,value pairs as DataStream[ObjectNode] json to a map Scala


I am trying to read json data from kafka and process it in Scala.I am new to flink and kafka streaming so please try to answer by giving the solution code.I want to be able to convert it to Map containing all the key,value pairs.

map1.get("FC196") should give me Dormant where map1 is the map containing the key value pairs

The challenge I'm facing is converting the DataStream[ObjectNode] which is the st variable in the code to a map of key value pairs. I am using JSonDeserializerSchema.If I use Simple String Schema i get DataStream[String]. I am open to alternative suggestions.

Input Format from kafka :

{"FC196":"Dormant","FC174":"A262210940","FC195":"","FC176":"40","FC198":"BANKING","FC175":"AHMED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053575","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"afs","FC188":"BR08","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}

Code :

import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema



object WordCount {
  def main(args: Array[String]) {

    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.*:9092")
    properties.setProperty("zookeeper.connect", "***.**.*.*:2181")
    properties.setProperty("group.id", "afs")
    properties.setProperty("auto.offset.reset", "latest")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val st = env
      .addSource(new FlinkKafkaConsumer09("new", new JSONDeserializationSchema() , properties))

    st.print()

      env.execute()
  }
}

My code after the changes :

import java.util.Properties

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try



object WordCount{
  def main(args: Array[String]) {

    case class CC(key:String)

    implicit val formats = org.json4s.DefaultFormats
    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
    properties.setProperty("group.id", "afs")
    properties.setProperty("auto.offset.reset", "earliest")
    val env = StreamExecutionEnvironment.getExecutionEnvironment

   val st = env
       .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
       .flatMap(raw => JsonMethods.parse(raw).toOption)
       .map(_.extract[CC])

    st.print()

      env.execute()
  }
}

And for some reason I cannot put a Try in the flatmap as u described

error:

INFO [main] (TypeExtractor.java:1804) - No fields detected for class org.json4s.JsonAST$JValue. Cannot be used as a PojoType. Will be handled as GenericType
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:994)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:519)
    at org.apache.flink.quickstart.WordCount$.main(WordCount.scala:36)
    at org.apache.flink.quickstart.WordCount.main(WordCount.scala)
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$$anon$4
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
    ... 6 more

Process finished with exit code 1

Solution

  • There are two tasks that need to be dealt with here:

    1. Parsing the raw json payload to some form of AST
    2. Converting the AST into a format you can use.

    If you you use the SimpleStringSchema, you can choose a nice Json parser and unmarshall the json payload in a simple flatMap operator.

    Some dependencies for your build.sbt

    "org.json4s" %% "json4s-core" % "3.5.1",
    "org.json4s" %% "json4s-native" % "3.5.1"
    

    There are dozen Json libraries in Scala to choose from, a nice overview can be found here https://manuel.bernhardt.io/2015/11/06/a-quick-tour-of-json-libraries-in-scala/

    Then some parsing:

    scala> import org.json4s.native.JsonMethods._
    import org.json4s.native.JsonMethods._
    
    scala> val raw = """{"key":"value"}"""
    raw: String = {"key":"value"}
    
    scala> parse(raw)
    res0: org.json4s.JValue = JObject(List((key,JString(value))))
    

    At this stage an AST is available. This can be converted to a Map as follows:

    scala> res0.values
    res1: res0.Values = Map(key -> value)
    

    Keep in mind that Json4s does not perform exception handling, thus this can throw an exception (Something you should avoid when you fetch data from Kafka, it will kill your job eventually).

    In flink, this would look like this:

    env
      .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
      .flatMap(raw => Try(JsonMethods.parse(raw).toOption) // this will discard failed instances, you should handle better, ie log
      .map(_.values)
    

    However, I would recommend representing your data as a case class. This would need another step to map the AST to a case class.

    In the example above.

    scala> implicit val formats = org.json4s.DefaultFormats
    formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@341621da
    
    scala> case class CC(key: String)
    defined class CC
    
    scala> parse(raw).extract[CC]
    res20: CC = CC(value)
    

    Or in flink:

    env
      .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema(), properties))
      .flatMap(raw => Try(JsonMethods.parse(raw).toOption)
      .map(_.extract[CC])
    

    Update:

    Just move the implicit formats outside of the main method:

    Object WordCount {
        implicit val formats = org.json4s.DefaultFormats
        def main(args: Array[String]) = {...}
    }