I have this code below:
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Spark").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
val kafkaBrokers = Map("metadata.broker.list" -> "HostName:9092")
val offsetMap = Map(TopicAndPartition("topic_test", 0), 8)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaBrokers, offsetMap)
var offsetArray = Array[OffsetRange]()
lines.transform {rdd =>
offsetArray = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
_.message()
}.foreachRDD {rdd =>
/* NEW CODE */
}
ssc.start()
ssc.awaitTermination()
}
}
I have added the new code uder the comment /* NEW CODE */
. My question is the lines val
will contain a sequence of RDD's which basically form the kafka sever every 3 seconds. Then I am grabbing the message using the map function.
But I am a little confused on what the foreachRDD
function does. Does that iterate over all of the RDD's
which are in the lines DStream
(which is what I am trying to do)? The thing is the parse function from the lift-json
library only accepts a string so I need to iterate over all of the rdd's and pass that String value to the parse function which is what I attempted to do. But nothing is being printed out for some reason.
If you want to read data from a specific offset, you're using the wrong overload.
The one you need is this:
createDirectStream[K,
V,
KD <: Decoder[K],
VD <: Decoder[V], R]
(ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R]
You need a Map[TopicAndPartition, Long]
:
val offsetMap = Map(TopicAndPartition("topic_test", 0), 8L)
And you need to pass a function which receives a MessageAndMetadata[K, V]
and returns your desired type, for example:
val extractKeyValue: MessageAndMetadata[String, String] => (String, String) =
msgAndMeta => (msgAndMeta.key(), msgAndMeta.message())
And use it:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
(ssc, kafkaBrokers, offsetMap, extractKeyValue)