Search code examples
scalaapache-sparkspark-structured-streaming

Why does streaming Dataset fail with "Complete output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets... "?


I use Spark 2.2.0 and have the following error with Spark Structured Streaming on windows:

Complete output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark.


Solution

  • Complete output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

    Streaming aggregations require that you tell the Spark Structured Streaming engine when to output the aggregation (per so-called output mode) since the data that could be part of an aggregation might be late and available only after some time.

    The "some time" part is event lateness and described as the time that is watermark ago from the current time.

    That's why you have to specify the watermark to let Spark drop/disregard any late events and stop accumulating state that could eventually lead to OutOfMemoryError or similar.

    With that said, you should use withWatermark operator on your streaming Dataset.

    withWatermark Defines an event time watermark for this Dataset. A watermark tracks a point in time before which we assume no more late data is going to arrive.

    And quoting along...

    Spark will use this watermark for several purposes:

    • To know when a given time window aggregation can be finalized and thus can be emitted when using output modes that do not allow updates.
    • To minimize the amount of state that we need to keep for on-going aggregations, mapGroupsWithState and dropDuplicates operators.

    The current watermark is computed by looking at the MAX(eventTime) seen across all of the partitions in the query minus a user specified delayThreshold. Due to the cost of coordinating this value across partitions, the actual watermark used is only guaranteed to be at least delayThreshold behind the actual event time. In some cases we may still process records that arrive more than delayThreshold late.

    Check out Spark Structured Streaming's Handling Late Data and Watermarking.