Search code examples
scalaapache-sparkspark-streamingcheckpoint

Why will I see the periodic pulses in processing time chart when using mapWithState/checkpoint in spark streaming?


I write a stateful-wordCount spark streaming application which can receive data from Kafka continuously. My code includes a mapWithState function and can run correctly. When I check the Streaming Statistics at spark UI, I found some periodic pulses in Processing Time chart. I think this may be caused by the usage of checkpoint. Hope someone can explain this, great thanks!

The Streaming Statistics

and the completed batches table:

batches processing time

I find some 1-second-time-cost batches occur periodicly. Then I step into a 1-second-time-cost batch and a subsecond-time-cost batch and found the 1-second-time-cost batch has one more job then the other.

Comparing two kinds of batches: 1-second-time-cost batch subsecond-time-cost batch

It seems to be caused by the checkpoint, but I'm not sure.

Can anyone explain it in detail for me? THANKS!

Here is my code:

import kafka.serializer.StringDecoder 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.SparkConf 

object StateApp {

  def main(args: Array[String]) {

    if (args.length < 4) {
      System.err.println(
        s"""
           |Usage: KafkaSpark_008_test <brokers> <topics> <batchDuration>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <batchDuration> is the batch duration of spark streaming
           |  <checkpointPath> is the checkpoint directory
        """.stripMargin)
      System.exit(1)
    }

    val Array(brokers, topics, bd, cpp) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("KafkaSpark_080_test")
    val ssc = new StreamingContext(sparkConf, Seconds(bd.toInt))

    ssc.checkpoint(cpp)

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // test the messages' receiving speed
    messages.foreachRDD(rdd =>
      println(System.currentTimeMillis() + "\t" + System.currentTimeMillis() / 1000 + "\t" + (rdd.count() / bd.toInt).toString))

    // the messages' value type is "timestamp port word", eg. "1479700000000 10105 ABC"
    // wordDstream: (word, 1), eg. (ABC, 1)
    val wordDstream = messages.map(_._2).map(msg => (msg.split(" ")(2), 1))

    // this is from Spark Source Code example in Streaming/StatefulNetworkWordCount.scala
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      val output = (word, sum)
      state.update(sum)
      output
    }

    val stateDstream = wordDstream.mapWithState(
      StateSpec.function(mappingFunc)).print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()   }

}

Solution

  • These little spikes you see are caused by checkpointing your data to persistent storage. In order for Spark to do state full transformations it needs to reliably store your data at every defined interval to be able to recover in case of failure.

    Notice the spikes are consistent in time as they execute every 50 seconds. This calculation is: (batch time * default multiplier), where the current default multiplier is 10. In your case this is 5 * 10 = 50 which explains why the spike is visible every 50 seconds.