I have been trying to get Spark read events from Kinesis recently but am having problem in receiving the events. While Spark is able to connect to Kinesis and is able to get metadata from Kinesis, Its not able to get events from it. It always fetches zero elements back.
There are no errors, just empty results back. Spark is able to get metadata (Eg. number of shards in kinesis etc).
I have used these [1 & 2] guides for getting it working but have not got much luck yet. I have also tried couple of suggestions from SO [3]. The cluster has sufficient resources/cores available.
We have seen a version conflict in Protobuf Version between Spark and Kinesis which could also be a cause for this behavior. Spark uses protobuf-java version 2.5.0 and kinesis probably uses protobuf-java-2.6.1.jar.
Just wondered if anyone has come across this behavior or, has got spark working with kinesis.
Have tried with Spark 1.5.0, Spark 1.6.0.
Answering my own Question -
I have got some success with Spark Kinesis integration, and the key being the unionStreams.foreachRDD.
There are 2 versions of the foreachRDD available
For some reason the first one is not able to get me the results but changing to the second one fetches me the results as expected. Yet to explore the reason.
Adding a code snippet below for reference.
Also consider changing this. This helped me as well-
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1", // Works
Hope it helps someone :)
Thanks everyone for help.
val kinesisStreams = (0 until numStreams).map {
count =>
val stream = KinesisUtils.createStream(
ssc,
consumerName,
streamName,
endpointUrl,
regionName,
InitialPositionInStream.TRIM_HORIZON,
kinesisCheckpointInterval,
StorageLevel.MEMORY_AND_DISK_2
)
stream
}
val unionStreams = ssc.union(kinesisStreams)
println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")
/*unionStreams.foreachRDD{ // Doesn't Work !!
rdd =>
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}*/
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}
)
ssc.start()
ssc.awaitTermination()