Search code examples
performanceapache-sparkmonitoringspark-streamingamazon-cloudwatch

spark streaming throughput monitoring


Is there a way to monitor the input and output throughput of a Spark cluster, to make sure the cluster is not flooded and overflowed by incoming data?

In my case, I set up Spark cluster on AWS EC2, so I'm thinking of using AWS CloudWatch to monitor the NetworkIn and NetworkOut for each node in the cluster.

But my idea seems to be not accurate and network does not meaning incoming data for Spark only, maybe also some other data would be calculated too.

Is there a tool or way to monitor specifically for Spark cluster streaming data status? Or there's already a built-in tool in Spark that I missed?


update: Spark 1.4 released, monitoring at port 4040 is significantly enhanced with graphical display


Solution

  • Spark has a configurable metric subsystem. By default it publishes a JSON version of the registered metrics on <driver>:<port>/metrics/json. Other metrics syncs, like ganglia, csv files or JMX can be configured.

    You will need some external monitoring system that collects metrics on a regular basis an helps you make sense of it. (n.b. We use Ganglia but there's other open source and commercial options)

    Spark Streaming publishes several metrics that can be used to monitor the performance of your job. To calculate throughput, you would combine:

    (lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records

    For all metrics supported, have a look at StreamingSource

    Example: Starting a local REPL with Spark 1.3.1 and after executing a trivial streaming application:

    import org.apache.spark.streaming._
    val ssc = new StreamingContext(sc, Seconds(10))
    val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
    val q = queue.map(elem => sc.parallelize(Seq(elem)))
    val dstream = ssc.queueStream(q)
    dstream.print
    ssc.start
    

    one can GET localhost:4040/metrics/json and that returns:

    {
    version: "3.0.0",
    gauges: {
    local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
    value: 0
    },
    local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
    value: 2120
    },
    local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
    value: 0
    },
    local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
    value: 2120
    },
    local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
    value: 0
    },
    local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
    value: 6
    },
    local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
    value: 0
    },
    local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
    value: 0
    },
    local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
    value: 44
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
    value: 1430559950044
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
    value: 1430559950000
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
    value: 1430559950000
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
    value: 44
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
    value: 1430559950044
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
    value: 1430559950000
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
    value: 1430559950000
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
    value: 2
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
    value: 2
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
    value: 0
    },
    local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
    value: 0
    }
    },
    counters: { },
    histograms: { },
    meters: { },
    timers: { }
    }