Search code examples
streamingapache-sparkflume

Apache spark-streaming application output not forwarded to the master


I'm trying to run the FlumeEvent example which is the following

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.streaming.flume.FlumeUtils

object FlumeEventCount {
def main(args: Array[String]) {


val batchInterval = Milliseconds(2000)

// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount")
.set("spark.cleaner.ttl","3")


val ssc = new StreamingContext(sparkConf, batchInterval)


// Create a flume stream
var  stream = FlumeUtils.createStream(ssc, "192.168.1.5",3564, StorageLevel.MEMORY_ONLY_SER_2)


// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received  flume events." + cnt ).print()
stream.count.print()
stream.print()
ssc.start()
ssc.awaitTermination()
}
}

My sbt file is the following

import AssemblyKeys._

assemblySettings

name := "flume-test"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % "1.0.0" exclude("org.apache.spark","spark-core") exclude("org.apache.spark", "spark-streaming_2.10")

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

I run the programm with the following command

/tmp/spark-1.0.0-bin-hadoop2/bin/spark-submit --class FlumeEventCount --master local --deploy-mode client /tmp/fooproj/target/scala-2.10/cert-log-manager-assembly-1.0.jar 

On the other side, the flume application is sending everything correctly and I can see in the logs that it's received.

I haven't made any changes to spark's configuration nor setup any environment variables, I just downloaded and unpacked the program.

Can someone tell me what am I doing wrong?

//edit: When I execute spark's FlumeEventCount example, it works //edit2: If I remove the awaiTermination and add an ssc.stop it prints everything one single time, I guess this happens because something is getting flushed


Solution

  • ....I should have learned to rtfm more carefully by now,

    quoting from this page: https://spark.apache.org/docs/latest/streaming-programming-guide.html

    // Spark Streaming needs at least two working thread val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1))

    I've been launching spark with only one thread also the following works fine

    stream.map(event=>"Event: header:"+ event.event.get(0).toString+" body:"+ new String(event.event.getBody.array) ).print