I am trying to read json data from kafka and process it in Scala.I am new to flink and kafka streaming so please try to answer by giving the solution code.I want to be able to convert it to Map containing all the key,value pairs.
map1.get("FC196") should give me Dormant where map1 is the map containing the key value pairs
The challenge I'm facing is converting the DataStream[ObjectNode] which is the st variable in the code to a map of key value pairs. I am using JSonDeserializerSchema.If I use Simple String Schema i get DataStream[String]. I am open to alternative suggestions.
Input Format from kafka :
{"FC196":"Dormant","FC174":"A262210940","FC195":"","FC176":"40","FC198":"BANKING","FC175":"AHMED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053575","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"afs","FC188":"BR08","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}
Code :
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WordCount {
def main(args: Array[String]) {
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.*:9092")
properties.setProperty("zookeeper.connect", "***.**.*.*:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09("new", new JSONDeserializationSchema() , properties))
st.print()
env.execute()
}
}
My code after the changes :
import java.util.Properties
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s.DefaultFormats
import org.json4s._
import org.json4s.native.JsonMethods
import scala.util.Try
object WordCount{
def main(args: Array[String]) {
case class CC(key:String)
implicit val formats = org.json4s.DefaultFormats
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id", "afs")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
.map(_.extract[CC])
st.print()
env.execute()
}
}
And for some reason I cannot put a Try in the flatmap as u described
error:
INFO [main] (TypeExtractor.java:1804) - No fields detected for class org.json4s.JsonAST$JValue. Cannot be used as a PojoType. Will be handled as GenericType
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:994)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:519)
at org.apache.flink.quickstart.WordCount$.main(WordCount.scala:36)
at org.apache.flink.quickstart.WordCount.main(WordCount.scala)
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$$anon$4
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 6 more
Process finished with exit code 1
There are two tasks that need to be dealt with here:
If you you use the SimpleStringSchema, you can choose a nice Json parser and unmarshall the json payload in a simple flatMap operator.
Some dependencies for your build.sbt
"org.json4s" %% "json4s-core" % "3.5.1",
"org.json4s" %% "json4s-native" % "3.5.1"
There are dozen Json libraries in Scala to choose from, a nice overview can be found here https://manuel.bernhardt.io/2015/11/06/a-quick-tour-of-json-libraries-in-scala/
Then some parsing:
scala> import org.json4s.native.JsonMethods._
import org.json4s.native.JsonMethods._
scala> val raw = """{"key":"value"}"""
raw: String = {"key":"value"}
scala> parse(raw)
res0: org.json4s.JValue = JObject(List((key,JString(value))))
At this stage an AST is available. This can be converted to a Map as follows:
scala> res0.values
res1: res0.Values = Map(key -> value)
Keep in mind that Json4s does not perform exception handling, thus this can throw an exception (Something you should avoid when you fetch data from Kafka, it will kill your job eventually).
In flink, this would look like this:
env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => Try(JsonMethods.parse(raw).toOption) // this will discard failed instances, you should handle better, ie log
.map(_.values)
However, I would recommend representing your data as a case class. This would need another step to map the AST to a case class.
In the example above.
scala> implicit val formats = org.json4s.DefaultFormats
formats: org.json4s.DefaultFormats.type = org.json4s.DefaultFormats$@341621da
scala> case class CC(key: String)
defined class CC
scala> parse(raw).extract[CC]
res20: CC = CC(value)
Or in flink:
env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema(), properties))
.flatMap(raw => Try(JsonMethods.parse(raw).toOption)
.map(_.extract[CC])
Update:
Just move the implicit formats outside of the main method:
Object WordCount {
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]) = {...}
}