I ran this on Databricks CE in a notebook and it produces output to a delta table. I am using
.format("rate")
approach.val streamingQuery = aggregatesDF.writeStream .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()
Running this, however, produces no output! It stops after one invocation, but the table remains empty.
Reason is?
Surely not a CE limitation? Error?
Could it be a bug, here goes:
import org.apache.spark.sql.streaming.Trigger val streamingQuery = aggregatesDF.writeStream .trigger(Trigger.Once()) .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()
.format("rate")
, could that be the issue? Which is handy for prototyping.
Trigger.Once
isn't limited to Databricks - it's a standard functionality of Spark Structured Streaming. But the problem is that it requires a data source that has a history as it triggers processing of the data since last execution, and rate
source don't have a history, always start from the beginning. It's easy to show:
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \
.outputMode("append").save("1.parquet")
spark.read.parquet("1.parquet").show()
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+
If you want to continue to use rate
for experimenting, it's better to create an additional table that will be a buffer between rate
and your code. Something like this:
# Create a buffer table
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
.format("delta").outputMode("append").save("1.delta")
# Use buffer table
bufferDF = spark.read.stream.format("delta").load("1.delta")
aggregatesDF = bufferDF....
streamingQuery = aggregatesDF.writeStream
.trigger(once=True)
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
P.S. It makes no sense use .format("delta")
together with .foreachBatch
- the latter takes precedence.