Search code examples

Spark Structured Streaming shows no output on Databricks with Trigger.Once

I ran this on Databricks CE in a notebook and it produces output to a delta table. I am using .format("rate") approach.

val streamingQuery = aggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)

Running this, however, produces no output! It stops after one invocation, but the table remains empty.

Reason is?

Surely not a CE limitation? Error?

  • Can this processing mode not be run in a cell?
  • Volume issue?
  • This brings the question to mind, can Trigger Once only be used in the Databricks environment? I am assuming I can run this as a jar under Linux.

Could it be a bug, here goes:

import org.apache.spark.sql.streaming.Trigger

val streamingQuery = aggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)

.format("rate"), could that be the issue? Which is handy for prototyping.


  • Trigger.Once isn't limited to Databricks - it's a standard functionality of Spark Structured Streaming. But the problem is that it requires a data source that has a history as it triggers processing of the data since last execution, and rate source don't have a history, always start from the beginning. It's easy to show:

    df = spark.readStream.format("rate").load()
    df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \

    If you want to continue to use rate for experimenting, it's better to create an additional table that will be a buffer between rate and your code. Something like this:

    # Create a buffer table
    df = spark.readStream.format("rate").load()
    df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
    # Use buffer table
    bufferDF ="delta").load("")
    aggregatesDF = bufferDF....
    streamingQuery = aggregatesDF.writeStream
      .foreachBatch(upsertToDelta _)

    P.S. It makes no sense use .format("delta") together with .foreachBatch - the latter takes precedence.