Search code examples
apache-sparkdatabricksspark-structured-streamingdelta-lake

Spark Structured Streaming shows no output on Databricks with Trigger.Once


I ran this on Databricks CE in a notebook and it produces output to a delta table. I am using .format("rate") approach.

val streamingQuery = aggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Running this, however, produces no output! It stops after one invocation, but the table remains empty.

Reason is?

Surely not a CE limitation? Error?

  • Can this processing mode not be run in a cell?
  • Volume issue?
  • This brings the question to mind, can Trigger Once only be used in the Databricks environment? I am assuming I can run this as a jar under Linux.

Could it be a bug, here goes:

import org.apache.spark.sql.streaming.Trigger

val streamingQuery = aggregatesDF.writeStream
  .trigger(Trigger.Once())
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

.format("rate"), could that be the issue? Which is handy for prototyping.


Solution

  • Trigger.Once isn't limited to Databricks - it's a standard functionality of Spark Structured Streaming. But the problem is that it requires a data source that has a history as it triggers processing of the data since last execution, and rate source don't have a history, always start from the beginning. It's easy to show:

    df = spark.readStream.format("rate").load()
    df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \
      .outputMode("append").save("1.parquet")
    spark.read.parquet("1.parquet").show()
    +---------+-----+
    |timestamp|value|
    +---------+-----+
    +---------+-----+
    

    If you want to continue to use rate for experimenting, it's better to create an additional table that will be a buffer between rate and your code. Something like this:

    # Create a buffer table
    df = spark.readStream.format("rate").load()
    df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
      .format("delta").outputMode("append").save("1.delta")
    
    # Use buffer table
    bufferDF = spark.read.stream.format("delta").load("1.delta")
    aggregatesDF = bufferDF....
    streamingQuery = aggregatesDF.writeStream
      .trigger(once=True)
      .foreachBatch(upsertToDelta _)
      .outputMode("update")
      .start()
    

    P.S. It makes no sense use .format("delta") together with .foreachBatch - the latter takes precedence.