I am writing a continuous application with Apache Spark. In the structured streaming case I am attempting to read from a Delta table, perform a streaming aggregation on event time via a time window, and write the result to a Delta table in append mode. My expectation per the documentation is that in append mode only the finalized aggregate for a time window will be written to the sink. This has not been my experience. Instead I see records like the following in my target Delta table, independent of the many configurations I have tried with the stream (windowDuration=5 minutes, slideDuration=20 seconds).
As you can see from the above picture, the same time window is contributing many records to the sink. I confirmed at most a single record for a time window is output from each micro batch, but a time window can contribute output records from many (not obviously consistent in the number) micro batches. Here is the core of the streaming aggregation code.
output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"{trades_stream_watermark_secs} seconds")
.withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
windowDuration=f"{analysis_window_length_secs} seconds",
slideDuration=f"{analysis_window_hop_size_secs} seconds"))
.groupBy('time_window')
.applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))
The Pandas UDF creates some variables holding scalar values, constructs a Pandas DataFrame of shape [1,N], and outputs that as the result. That is, it's returning a single row. The only thing I'm grouping on is the time window. How could I be getting multiple records for the same time window? I have created and closed the stream in numerous ways and received the same result each time (e.g., per the Delta Lake docs, per the structured streaming guide, and across the read/load/table/toTable API options, trying every option configuration I could find... yes, many hours of brute force). I have also tried various ranges of values with the watermark duration and trigger period; none had an impact.
Is this the expected behavior in append mode (i.e., multiple records for the same time window)?
Edit: I am using the Databricks runtime version 8.3 ML. It has Spark version '3.1.1'.
Edit 2: I'm tentatively considering whether this issue as related: https://issues.apache.org/jira/browse/SPARK-25756
To avoid this from joining the legion of unanswered/followed up on questions, I'll jot down my tentative conclusion below and update it if I learn more. It may be wrong. Please don't let this deter other answers/comments.
Overall, this is not intended behavior. Each microbatch is being individually sent to the Pandas UDF (i.e., on every trigger the current micro batch and only that micro batch is sent to the UDF) and results in a record in the result table being sent to the sink despite being in append mode. The issue has been noted by the developers and at least one JIRA issue created to resolve it. This thread of work appears to be inactive.
Other data points and recommendation:
Highly interested in hearing about potential workarounds or if I came to an incorrect conclusion above. Cheers.