Search code examples
apache-sparkspark-streamingspark-structured-streamingspark-streaming-kafka

Why a new batch is triggered without getting any new offsets in streaming source?


I have multiple spark structured streaming jobs and the usual behaviour that I see is that a new batch is triggered only when there are any new offsets in Kafka which is used as source to create streaming query.

But when I run this example which demonstrates arbitrary stateful operations using mapGroupsWithState , then I see that a new batch is triggered even if there is no new data in Streaming source. Why is it so and can it be avoided?

Update-1 I modified the above example code and remove state related operation like updating/removing it. Function simply outputs zero. But still a batch is triggered every 10 seconds without any new data on netcat server.

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._

object Stateful {

  def main(args: Array[String]): Unit = {

    val host = "localhost"
    val port = "9999"

    val spark = SparkSession
      .builder
      .appName("StructuredSessionization")
      .master("local[2]")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .option("includeTimestamp", true)
      .load()

    // Split the lines into words, treat words as sessionId of events
    val events = lines
      .as[(String, Timestamp)]
      .flatMap { case (line, timestamp) =>
        line.split(" ").map(word => Event(sessionId = word, timestamp))
      }

    val sessionUpdates = events
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo, Int](GroupStateTimeout.ProcessingTimeTimeout) {

        case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
          0
      }

    val query = sessionUpdates
      .writeStream
      .outputMode("update")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .format("console")
      .start()

    query.awaitTermination()
  }
}

case class Event(sessionId: String, timestamp: Timestamp)

case class SessionInfo(
                        numEvents: Int,
                        startTimestampMs: Long,
                        endTimestampMs: Long)

Solution

  • The reason for the empty batches showing up is the usage of Timeouts within the mapGroupsWithState call.

    According to the book "Learning Spark 2.0" it says:

    "The next micro-batch will call the function on this timed-out key even if there is not data for that key in that micro.batch. [...] Since the timeouts are processed during the micro-batches, the timing of their execution is imprecise and depends heavily on the trigger interval [...]."

    As you have set the timeout to be GroupStateTimeout.ProcessingTimeTimeout it aligns with your trigger time of the query which is 10 seconds. The alternative would be to set the timeout based on event time (i.e. GroupStateTimeout.EventTimeTimeout).

    The ScalaDocs on GroupState provide some more details:

    When the timeout occurs for a group, the function is called for that group with no values, and GroupState.hasTimedOut() set to true.