Search code examples
apache-sparkspark-streamingflume

Flume+Spark - Storing DStream in HDFS


I have flume stream which I want to store it in HDFS via spark . Below is spark code that I am running

object FlumePull {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumePollingEventCount <host> <port>")
      System.exit(1)
    }

    val batchInterval = Milliseconds(60000)
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    val stream = FlumeUtils.createPollingStream(ssc, "localhost", 9999)

    stream.map(x => x + "!!!!")
          .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")

    ssc.start()
    ssc.awaitTermination()
  }
}

When I start my spsark streaming job , it does stores output in HDFS but output is something like this:

[root@sandbox ~]# hadoop fs -cat /user/root/spark/flume_Map_-1459450380000._Mapout/part-00000
org.apache.spark.streaming.flume.SparkFlumeEvent@1b9bd2c9!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@33fd3a48!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@35fd67a2!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@f9ed85f!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@58f4cfc!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@307373e!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@4ebbc8ff!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@a8905bb!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29d73d64!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@71ff85b1!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@3ea261ef!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@16cbb209!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@17157890!!!!
org.apache.spark.streaming.flume.SparkFlumeEvent@29e41c7!!!!

It is storing flume event instead of data coming from Flume. How do it get data out of it?

Thanks


Solution

  • You need to extract the underlying buffer from the SparkFlumeEvent and save that. For example, if your event body is a String:

    stream.map(x => new String(x.event.getBody.array) + "!!!!")
          .saveAsTextFiles("/user/root/spark/flume_Map_", "_Mapout")