Search code examples
scalaapache-sparkspark-streaminglift-json

Lift-Json Extracting from JSON object


I have this code below:

object Test {
    def main(args: Array[String]) {
          val sparkConf = new SparkConf().setAppName("Spark").setMaster("local[2]")
          val sc = new SparkContext(sparkConf)

          val ssc = new StreamingContext(sc, Seconds(3))
          val kafkaBrokers = Map("metadata.broker.list" -> "HostName:9092")
          val offsetMap = Map(TopicAndPartition("topic_test", 0), 8) 
          val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, offsetMap)

var offsetArray = Array[OffsetRange]()
                lines.transform {rdd =>
                        offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                        rdd
                }.map {
                        _.message()
                }.foreachRDD {rdd =>
                     /* NEW CODE */
                }
                ssc.start()
                ssc.awaitTermination()
    }
}

I have added the new code uder the comment /* NEW CODE */. My question is the lines val will contain a sequence of RDD's which basically form the kafka sever every 3 seconds. Then I am grabbing the message using the map function.

But I am a little confused on what the foreachRDD function does. Does that iterate over all of the RDD's which are in the lines DStream (which is what I am trying to do)? The thing is the parse function from the lift-json library only accepts a string so I need to iterate over all of the rdd's and pass that String value to the parse function which is what I attempted to do. But nothing is being printed out for some reason.


Solution

  • If you want to read data from a specific offset, you're using the wrong overload.

    The one you need is this:

    createDirectStream[K, 
                       V,
                       KD <: Decoder[K], 
                       VD <: Decoder[V], R]
                       (ssc: StreamingContext,
                        kafkaParams: Map[String, String], 
                        fromOffsets: Map[TopicAndPartition, Long], 
                        messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R]
    

    You need a Map[TopicAndPartition, Long]:

    val offsetMap = Map(TopicAndPartition("topic_test", 0), 8L) 
    

    And you need to pass a function which receives a MessageAndMetadata[K, V] and returns your desired type, for example:

    val extractKeyValue: MessageAndMetadata[String, String] => (String, String) = 
          msgAndMeta => (msgAndMeta.key(), msgAndMeta.message())
    

    And use it:

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
    (ssc, kafkaBrokers, offsetMap, extractKeyValue)