Search code examples
scalaspark-streamingspark-structured-streaming

How can we get mini-batch time from Structured Streaming


In the Spark streaming, there is forEachRDD with time parameter, where it is possible to take that time and use it for different purposes - metadata, create additional time column in rdd, ...

val stream = KafkaUtils.createDirectStream(...)
stream.foreachRDD { (rdd, time) => 
  // update metadata with time 
  // convert rdd to df and add time column
  // write df
 }    

In Structured Streaming the API

val df: Dataset[Row] = spark
  .readStream
  .format("kafka")
  .load()

df.writeStream.trigger(...)
  .outputMode(...)
  .start()

How is that possible to get similar time (mini-batch time) data for structured streaming to be able to use it in the same way?


Solution

  • I have searched for a function which offers the possibility to get the batchTime but it doesn't seem to exist yet in the Spark Structured Streaming APIs.

    Here's a workaround I used to get the batch time (Let's suppose that the batch interval is 2000 milliseconds) using the foreachBatchwhich allow us to get the batchId :

    val now = java.time.Instant.now
    val batchInterval = 2000
    df.writeStream.trigger(Trigger.ProcessingTime(batchInterval))
      .foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
         println(now.plusMillis(batchId * batchInterval.milliseconds))
      })
      .outputMode(...)
      .start()
    

    Here's the output :

    2019-07-29T17:13:19.880Z 2019-07-29T17:13:21.880Z 2019-07-29T17:13:23.880Z 2019-07-29T17:13:25.880Z 2019-07-29T17:13:27.880Z 2019-07-29T17:13:29.880Z 2019-07-29T17:13:31.880Z 2019-07-29T17:13:33.880Z 2019-07-29T17:13:35.880Z

    I hope it helps !