Search code examples
pysparkspark-structured-streaming

Spark structured streaming in append mode outputting many rows per single time window


I am writing a continuous application with Apache Spark. In the structured streaming case I am attempting to read from a Delta table, perform a streaming aggregation on event time via a time window, and write the result to a Delta table in append mode. My expectation per the documentation is that in append mode only the finalized aggregate for a time window will be written to the sink. This has not been my experience. Instead I see records like the following in my target Delta table, independent of the many configurations I have tried with the stream (windowDuration=5 minutes, slideDuration=20 seconds).

Example output from stream

As you can see from the above picture, the same time window is contributing many records to the sink. I confirmed at most a single record for a time window is output from each micro batch, but a time window can contribute output records from many (not obviously consistent in the number) micro batches. Here is the core of the streaming aggregation code.

output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"{trades_stream_watermark_secs} seconds")
                          .withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
                                                              windowDuration=f"{analysis_window_length_secs} seconds",
                                                              slideDuration=f"{analysis_window_hop_size_secs} seconds"))
                          .groupBy('time_window')
                          .applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))

The Pandas UDF creates some variables holding scalar values, constructs a Pandas DataFrame of shape [1,N], and outputs that as the result. That is, it's returning a single row. The only thing I'm grouping on is the time window. How could I be getting multiple records for the same time window? I have created and closed the stream in numerous ways and received the same result each time (e.g., per the Delta Lake docs, per the structured streaming guide, and across the read/load/table/toTable API options, trying every option configuration I could find... yes, many hours of brute force). I have also tried various ranges of values with the watermark duration and trigger period; none had an impact.

Is this the expected behavior in append mode (i.e., multiple records for the same time window)?

Edit: I am using the Databricks runtime version 8.3 ML. It has Spark version '3.1.1'.

Edit 2: I'm tentatively considering whether this issue as related: https://issues.apache.org/jira/browse/SPARK-25756


Solution

  • To avoid this from joining the legion of unanswered/followed up on questions, I'll jot down my tentative conclusion below and update it if I learn more. It may be wrong. Please don't let this deter other answers/comments.

    Overall, this is not intended behavior. Each microbatch is being individually sent to the Pandas UDF (i.e., on every trigger the current micro batch and only that micro batch is sent to the UDF) and results in a record in the result table being sent to the sink despite being in append mode. The issue has been noted by the developers and at least one JIRA issue created to resolve it. This thread of work appears to be inactive.

    Other data points and recommendation:

    • Multiple issues in different forums (e.g., Databricks), and the above linked JIRA issue, directly reference, or provide explicit examples of this bug in Spark.
    • The issue has been present since 2018, a fix appeared to be targeted for version 3.1.2, but the JIRA issue was bulk closed and I see no continuation of the discussion/work.
    • Spark Structured Streaming at this time, for Python developers, supports only trivial data transformations on streaming aggregations (i.e., functions you can run on a GroupedData object except for apply or applyInPandas).
    • If you're looking for a streaming compute engine for a non-trivial application, don't expect support from the Python Spark API until this issue is resolved.

    Highly interested in hearing about potential workarounds or if I came to an incorrect conclusion above. Cheers.