Search code examples
apache-sparkspark-streamingspark-structured-streamingspark-streaming-kafka

Spark Structured streaming watermark corresponding to deviceid


Incoming data is stream like below consist of 3 columns

[
 system -> deviceId,
 time -> eventTime
 value -> some metric
]
+-------+-------------------+-----+
|system |time               |value|
+-------+-------------------+-----+
|system1|2019-08-20 07:13:10|1.5  |
|system2|2019-08-20 07:11:10|1.9  |
|system3|2019-08-20 07:13:15|1.3  |
|system1|2019-08-20 07:13:20|1.8  |
|system2|2019-08-20 07:11:20|1.6  |
|system3|2019-08-20 07:13:25|1.4  |
|system1|2019-08-20 07:13:30|1.2  |
|system2|2019-08-20 07:11:30|1.1  |
|system3|2019-08-20 07:13:35|1.5  |
+-------+-------------------+-----+

Each device produces data at fix interval of say [10 seconds],

I have spark structured streaming app which calculates max of value with

Window duration = 30 Seconds

Sliding duration = 30 Seconds

      df.withWatermark("time", "30 seconds")
      .groupBy(
        window(col("time"), "30 seconds", "30 seconds"),
        col("system")
      )
      .agg(max("value"))

Problem As each device is independent there clocks are also independent. The device can be choked and delayed the data sending because of various reasons ex: [Network Issue, High usage of the device, etc]

Now as its single job processing data it will start dropping the data of choked device based on the watermark and we are losing the data.

Is there any way or workaround so that watermark can be tied up with the deviceId. So that job maintains watermark per [deviceId EventTime] and process no dropped it because of other devices.


Solution

  • From https://towardsdatascience.com/watermarking-in-spark-structured-streaming-9e164f373e9, as I cannot state it better myself:

    Since Spark 2.1, watermarking is introduced into Structured Streaming API. You can enable it by simply adding the withWatermark-Operator to a query:

    withWatermark(eventTime: String, delayThreshold: String):

    Dataset[T] It takes two Parameters, a) an event time column (must be the same as the aggregate is working on) and b) a threshold to specify for how long late data should be processed (in event time unit). The state of an aggregate will then be maintained by Spark until max eventTime — delayThreshold > T , where max eventTime is the latest event time seen by the engine and T is the starting time of a window. If late data fall within this threshold, the query gets updated eventually (right image in the figure below). Otherwise it gets dropped and no reprocessing is triggered (left image in figure below).

    As you can read, the concept does not involve the adding metadata split, e.g. deviceid.