Search code examples
apache-sparkspark-streamingspark-structured-streaming

Accuracy of timing of the Trigger.ProcessingTime for Spark Structured Streaming


I have one spark job for the structured streaming of kafka data. The basic code is as following.

val rules_monitoring_stream = rules_imsi_df.writeStream
  .outputMode("append")
  .format("memory")
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    if(!batchDF.isEmpty) {
      printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())
      batchDF.show()
      batchDF.persist()
      // ... Processing batchDF and populate a static dataframe
      batchDF.unpersist()
    }
  }
  .start()

while(rules_monitoring_stream.isActive) {
  Thread.sleep(240000)
  // Periodically load data from database
}

The basic idea is to stream the kafka data in 120 seconds window, process the microbatch data and populate a static dataframe.

In my understanding, the microbatch should arrive every 120 seconds by this design, and batchDF contains the data ingested during this time window.

However, based on my monitoring of the microbatch arrival time of the printf statement. I found the following output.

At 1594968139, the microbatch has 110 records
At 1594968242, the microbatch has 118 records
At 1594968380, the microbatch has 243 records
At 1594968483, the microbatch has 117 records
At 1594968602, the microbatch has 59 records

It seems the delta between the adjacent arrival time of the microbatch is not very accurate as 120 seconds. Sometimes, it is more than 120s, sometimes, it is less than 120s.

Is it normal? How to understand the time specified by Trigger.ProcessingTime? How to get a more accurate time window?

In addition, due to this inaccuracy, will it cause some data loss for the microbatches? By this I mean, some data is never captured by any microbatches?


Solution

  • "Is it normal? How to understand the time specified by Trigger.ProcessingTime?"

    Yes, this is normal. Remember that the configured Trigger triggers the entire query of you streaming job, not the foreachBatch method alone. As you usually have varying amount of records and different processing durations, the actual writing in your foreachBatch call will also not be a fixed time.

    "How to get a more accurate time window?"

    The triggers work pretty accurate and you may consider another way of measuring the trigger times, e.g. checking for the time at the very beginning of your query (right after the readStream call).

    "In addition, due to this inaccuracy, will it cause some data loss for the microbatches? By this I mean, some data is never captured by any microbatches?"

    No, there is no data lost.