Search code examples
apache-sparkspark-structured-streamingwatermark

Spark Structured Streaming - watermark being ignored, old data being output


I have a sample Spark Structured code, and I'm trying to implement/test out watermark to account for late-coming data.

Somehow, the watermark is being ignored and the older data is being published even when if the timestamp of the old data is more than (max(event timestamp) - watermark)

Here is the code :

schema = StructType([
            StructField("temparature", LongType(), False),
            StructField("ts", TimestampType(), False),
            StructField("insert_ts", TimestampType(), False)
        ])


streamingDataFrame = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", kafkaBrokers) \
                .option("group.id", 'watermark-grp') \
                .option("subscribe", topic) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema=schema).alias("parsed_value"))

resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \
                   , col("parsed_value.temparature").alias("temparature"), col("parsed_value.insert_ts").alias("insert_ts"))



resultM = resultC. \
    withWatermark("timestamp", "10 minutes"). \
    groupBy(window(resultC.timestamp, "10 minutes", "5 minutes")). \
    agg({'temparature':'sum'})

resultMF = resultM. \
            select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") \
                          , col("sum(temparature)").alias("Sum_Temperature"))

result = resultMF. \
                     writeStream. \
                     outputMode('update'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("sum_temparature"). \
                     start()

result.awaitTermination()

Data input into Kafka topic :

+---------------------------------------------------------------------------------------------------+----+
|value                                                                                              |key |
+---------------------------------------------------------------------------------------------------+----+
|{"temparature":7,"insert_ts":"2023-03-16T15:32:35.160-07:00","ts":"2022-03-16T16:12:00.000-07:00"} |null|
|{"temparature":15,"insert_ts":"2023-03-16T15:33:24.933-07:00","ts":"2022-03-16T16:12:00.000-07:00"}|null|
|{"temparature":11,"insert_ts":"2023-03-16T15:37:36.844-07:00","ts":"2022-03-15T16:12:00.000-07:00"}|null|
|{"temparature":8,"insert_ts":"2023-03-16T15:41:33.312-07:00","ts":"2022-03-16T10:12:00.000-07:00"} |null|
|{"temparature":14,"insert_ts":"2023-03-16T15:42:27.627-07:00","ts":"2022-03-16T10:10:00.000-07:00"}|null|
|{"temparature":6,"insert_ts":"2023-03-16T15:44:44.508-07:00","ts":"2022-03-16T11:16:00.000-07:00"} |null|
|{"temparature":19,"insert_ts":"2023-03-16T15:46:15.486-07:00","ts":"2022-03-16T11:16:00.000-07:00"}|null|
|{"temparature":3,"insert_ts":"2023-03-16T16:10:15.676-07:00","ts":"2022-03-16T16:16:00.000-07:00"} |null|
|{"temparature":13,"insert_ts":"2023-03-16T16:11:52.194-07:00","ts":"2022-03-14T16:16:00.000-07:00"}|null|
+---------------------------------------------------------------------------------------------------+----+

Output of the Structured Stream :

-------------------------------------------
Batch: 14
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-16 16:15:00|2022-03-16 16:25:00|3              |
|2022-03-16 16:10:00|2022-03-16 16:20:00|3              |
+-------------------+-------------------+---------------+

-------------------------------------------
Batch: 15
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-14 16:15:00|2022-03-14 16:25:00|13             |
|2022-03-14 16:10:00|2022-03-14 16:20:00|13             |
+-------------------+-------------------+---------------+

The records with "ts":"2022-03-14 16:16:00.000-07:00" is the last record put into kafka topic, this is 2 days old and yet this is being output.

Per my understanding - the batch(window) will get closed when the 'max(ts) - watermark > end time of the batch'. The max(ts) = 2022-03-16 16:12:00.00, so any data flowing in before max(ts) - watermark' should be ignored.

So, what am i doing incorrect here ? Any inputs on this is appreciated.

tia!

Update : seems it is guaranteed that records within the watermark will be taken up for processing .. records outside the watermark may or may not be taken up for processing. If the window is cleaned from state store, then late records will not be processed, else it will be processed.

Any ways to 'ensure'(or force) the late data arriving after watermark is not processed ?


Solution

  • The watermark column 'timestamp' is a reserved keyword, changing it to non-reserved name eg. 'ts' fixed the issue.