apache-sparkspark-structured-streaming

How exactly does dropDuplicatesWithinWatermark work?


I have this code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Trigger}

import java.sql.Timestamp

case class Event(id: Long, ts: Timestamp)

object DropDuplicatesWithinWatermarkMain extends App {
  val spark = SparkSession
    .builder()
    .appName("dropDuplicatesWithinWatermarkTest")
    .master("local[*]")
    .getOrCreate()

  spark.streams.addListener(new StreamingQueryListener {
    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {

    }

    override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
      println(event.json)
    }

    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {

    }
  })

  spark.sparkContext.setLogLevel("OFF")

  import spark.implicits._

  val memoryStream1 = MemoryStream[Event](1, spark.sqlContext, Some(5))
  val dfStr = memoryStream1.toDF()

  val streamingQuery = dfStr
    .withWatermark("ts", "20 seconds")
    .dropDuplicatesWithinWatermark("id")
    .writeStream
    .trigger(Trigger.ProcessingTime("1 second"))
    .format("console")
    .outputMode(OutputMode.Append())
    .start()

  memoryStream1.addData(Seq(
    Event(1, Timestamp.valueOf("2023-01-01 10:10:10")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:11")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:40")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:30")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:29")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
  ))

  streamingQuery.processAllAvailable()

  memoryStream1.addData(Seq(
    Event(1, Timestamp.valueOf("2023-01-01 10:10:21")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:22")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:23")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:24")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:56")),
    Event(1, Timestamp.valueOf("2023-01-01 10:10:57")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:00")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:20")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:21")),
  ))

  streamingQuery.processAllAvailable()

  memoryStream1.addData(Seq(
    Event(1, Timestamp.valueOf("2023-01-01 10:11:21")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:22")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:23")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:24")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:56")),
    Event(1, Timestamp.valueOf("2023-01-01 10:11:57")),
    Event(1, Timestamp.valueOf("2023-01-01 10:12:02")),
    Event(1, Timestamp.valueOf("2023-01-01 10:12:01")),
    Event(1, Timestamp.valueOf("2023-01-01 10:12:03"))
  ))

  streamingQuery.processAllAvailable()
}

This is the output it produces:

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------------+
| id|                 ts|
+---+-------------------+
|  1|2023-01-01 10: 10: 10|
+---+-------------------+

{
    "progress": {
        "id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
        "runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
        "name": null,
        "timestamp": "2023-11-19T20:45:54.001Z",
        "batchId": 0,
        "numInputRows": 9,
        "inputRowsPerSecond": 17.374517374517374,
        "processedRowsPerSecond": 0.356477997385828,
        "durationMs": {
            "addBatch": 24501,
            "commitOffsets": 317,
            "getBatch": 2,
            "latestOffset": 0,
            "queryPlanning": 214,
            "triggerExecution": 25247,
            "walCommit": 207
        },
        "eventTime": {
            "avg": "2023-01-01T07:10:25.777Z",
            "max": "2023-01-01T07:10:40.000Z",
            "min": "2023-01-01T07:10:10.000Z",
            "watermark": "1970-01-01T00:00:00.000Z"
        },
        "stateOperators": [
            {
                "operatorName": "dedupeWithinWatermark",
                "numRowsTotal": 1,
                "numRowsUpdated": 1,
                "allUpdatesTimeMs": 633,
                "numRowsRemoved": 0,
                "allRemovalsTimeMs": 156,
                "commitTimeMs": 532110,
                "memoryUsedBytes": 48256,
                "numRowsDroppedByWatermark": 0,
                "numShufflePartitions": 200,
                "numStateStoreInstances": 200,
                "customMetrics": {
                    "loadedMapCacheHitCount": 0,
                    "loadedMapCacheMissCount": 0,
                    "numDroppedDuplicateRows": 8,
                    "stateOnCurrentVersionSizeBytes": 19456
                }
            }
        ],
        "sources": [
            {
                "description": "MemoryStream[id#2L,ts#3]",
                "startOffset": null,
                "endOffset": 0,
                "latestOffset": 0,
                "numInputRows": 9,
                "inputRowsPerSecond": 17.374517374517374,
                "processedRowsPerSecond": 0.356477997385828
            }
        ],
        "sink": {
            "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
            "numOutputRows": 1
        }
    }
}
-------------------------------------------
Batch: 1
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+

{
    "progress": {
        "id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
        "runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
        "name": null,
        "timestamp": "2023-11-19T20:46:19.256Z",
        "batchId": 1,
        "numInputRows": 0,
        "inputRowsPerSecond": 0.0,
        "processedRowsPerSecond": 0.0,
        "durationMs": {
            "addBatch": 35702,
            "commitOffsets": 389,
            "getBatch": 0,
            "latestOffset": 0,
            "queryPlanning": 7,
            "triggerExecution": 36452,
            "walCommit": 349
        },
        "eventTime": {
            "watermark": "2023-01-01T07:10:20.000Z"
        },
        "stateOperators": [
            {
                "operatorName": "dedupeWithinWatermark",
                "numRowsTotal": 1,
                "numRowsUpdated": 0,
                "allUpdatesTimeMs": 0,
                "numRowsRemoved": 0,
                "allRemovalsTimeMs": 0,
                "commitTimeMs": 841879,
                "memoryUsedBytes": 85088,
                "numRowsDroppedByWatermark": 0,
                "numShufflePartitions": 200,
                "numStateStoreInstances": 200,
                "customMetrics": {
                    "loadedMapCacheHitCount": 200,
                    "loadedMapCacheMissCount": 0,
                    "numDroppedDuplicateRows": 0,
                    "stateOnCurrentVersionSizeBytes": 24176
                }
            }
        ],
        "sources": [
            {
                "description": "MemoryStream[id#2L,ts#3]",
                "startOffset": 0,
                "endOffset": 0,
                "latestOffset": 0,
                "numInputRows": 0,
                "inputRowsPerSecond": 0.0,
                "processedRowsPerSecond": 0.0
            }
        ],
        "sink": {
            "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
            "numOutputRows": 0
        }
    }
}
-------------------------------------------
Batch: 2
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+

{
    "progress": {
        "id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
        "runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
        "name": null,
        "timestamp": "2023-11-19T20:46:56.000Z",
        "batchId": 2,
        "numInputRows": 9,
        "inputRowsPerSecond": 30.82191780821918,
        "processedRowsPerSecond": 0.19603999215840032,
        "durationMs": {
            "addBatch": 45082,
            "commitOffsets": 440,
            "getBatch": 0,
            "latestOffset": 0,
            "queryPlanning": 7,
            "triggerExecution": 45909,
            "walCommit": 380
        },
        "eventTime": {
            "avg": "2023-01-01T07:10:47.111Z",
            "max": "2023-01-01T07:11:21.000Z",
            "min": "2023-01-01T07:10:21.000Z",
            "watermark": "2023-01-01T07:10:20.000Z"
        },
        "stateOperators": [
            {
                "operatorName": "dedupeWithinWatermark",
                "numRowsTotal": 1,
                "numRowsUpdated": 0,
                "allUpdatesTimeMs": 0,
                "numRowsRemoved": 0,
                "allRemovalsTimeMs": 0,
                "commitTimeMs": 1040896,
                "memoryUsedBytes": 89808,
                "numRowsDroppedByWatermark": 0,
                "numShufflePartitions": 200,
                "numStateStoreInstances": 200,
                "customMetrics": {
                    "loadedMapCacheHitCount": 400,
                    "loadedMapCacheMissCount": 0,
                    "numDroppedDuplicateRows": 9,
                    "stateOnCurrentVersionSizeBytes": 24176
                }
            }
        ],
        "sources": [
            {
                "description": "MemoryStream[id#2L,ts#3]",
                "startOffset": 0,
                "endOffset": 1,
                "latestOffset": 1,
                "numInputRows": 9,
                "inputRowsPerSecond": 30.82191780821918,
                "processedRowsPerSecond": 0.19603999215840032
            }
        ],
        "sink": {
            "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
            "numOutputRows": 0
        }
    }
}
-------------------------------------------
Batch: 3
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+

{
    "progress": {
        "id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
        "runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
        "name": null,
        "timestamp": "2023-11-19T20:47:41.909Z",
        "batchId": 3,
        "numInputRows": 0,
        "inputRowsPerSecond": 0.0,
        "processedRowsPerSecond": 0.0,
        "durationMs": {
            "addBatch": 51214,
            "commitOffsets": 446,
            "getBatch": 0,
            "latestOffset": 0,
            "queryPlanning": 4,
            "triggerExecution": 52093,
            "walCommit": 427
        },
        "eventTime": {
            "watermark": "2023-01-01T07:11:01.000Z"
        },
        "stateOperators": [
            {
                "operatorName": "dedupeWithinWatermark",
                "numRowsTotal": 0,
                "numRowsUpdated": 0,
                "allUpdatesTimeMs": 0,
                "numRowsRemoved": 1,
                "allRemovalsTimeMs": 1,
                "commitTimeMs": 1185689,
                "memoryUsedBytes": 89776,
                "numRowsDroppedByWatermark": 0,
                "numShufflePartitions": 200,
                "numStateStoreInstances": 200,
                "customMetrics": {
                    "loadedMapCacheHitCount": 600,
                    "loadedMapCacheMissCount": 0,
                    "numDroppedDuplicateRows": 0,
                    "stateOnCurrentVersionSizeBytes": 24000
                }
            }
        ],
        "sources": [
            {
                "description": "MemoryStream[id#2L,ts#3]",
                "startOffset": 1,
                "endOffset": 1,
                "latestOffset": 1,
                "numInputRows": 0,
                "inputRowsPerSecond": 0.0,
                "processedRowsPerSecond": 0.0
            }
        ],
        "sink": {
            "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
            "numOutputRows": 0
        }
    }
}
-------------------------------------------
Batch: 4
-------------------------------------------
+---+-------------------+
| id|                 ts|
+---+-------------------+
|  1|2023-01-01 10: 11: 21|
+---+-------------------+

{
    "progress": {
        "id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
        "runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
        "name": null,
        "timestamp": "2023-11-19T20:48:35.001Z",
        "batchId": 4,
        "numInputRows": 9,
        "inputRowsPerSecond": 9.01803607214429,
        "processedRowsPerSecond": 0.15629341483745485,
        "durationMs": {
            "addBatch": 56634,
            "commitOffsets": 496,
            "getBatch": 0,
            "latestOffset": 0,
            "queryPlanning": 4,
            "triggerExecution": 57584,
            "walCommit": 449
        },
        "eventTime": {
            "avg": "2023-01-01T07:11:43.222Z",
            "max": "2023-01-01T07:12:03.000Z",
            "min": "2023-01-01T07:11:21.000Z",
            "watermark": "2023-01-01T07:11:01.000Z"
        },
        "stateOperators": [
            {
                "operatorName": "dedupeWithinWatermark",
                "numRowsTotal": 1,
                "numRowsUpdated": 1,
                "allUpdatesTimeMs": 1,
                "numRowsRemoved": 0,
                "allRemovalsTimeMs": 0,
                "commitTimeMs": 1308628,
                "memoryUsedBytes": 89776,
                "numRowsDroppedByWatermark": 0,
                "numShufflePartitions": 200,
                "numStateStoreInstances": 200,
                "customMetrics": {
                    "loadedMapCacheHitCount": 800,
                    "loadedMapCacheMissCount": 0,
                    "numDroppedDuplicateRows": 8,
                    "stateOnCurrentVersionSizeBytes": 24176
                }
            }
        ],
        "sources": [
            {
                "description": "MemoryStream[id#2L,ts#3]",
                "startOffset": 1,
                "endOffset": 2,
                "latestOffset": 2,
                "numInputRows": 9,
                "inputRowsPerSecond": 9.01803607214429,
                "processedRowsPerSecond": 0.15629341483745485
            }
        ],
        "sink": {
            "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
            "numOutputRows": 1
        }
    }
}
-------------------------------------------
Batch: 5
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+

{
    "progress": {
        "id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
        "runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
        "name": null,
        "timestamp": "2023-11-19T20:49:32.586Z",
        "batchId": 5,
        "numInputRows": 0,
        "inputRowsPerSecond": 0.0,
        "processedRowsPerSecond": 0.0,
        "durationMs": {
            "addBatch": 63828,
            "commitOffsets": 511,
            "getBatch": 0,
            "latestOffset": 0,
            "queryPlanning": 6,
            "triggerExecution": 64812,
            "walCommit": 467
        },
        "eventTime": {
            "watermark": "2023-01-01T07:11:43.000Z"
        },
        "stateOperators": [
            {
                "operatorName": "dedupeWithinWatermark",
                "numRowsTotal": 0,
                "numRowsUpdated": 0,
                "allUpdatesTimeMs": 0,
                "numRowsRemoved": 1,
                "allRemovalsTimeMs": 1,
                "commitTimeMs": 1479275,
                "memoryUsedBytes": 89776,
                "numRowsDroppedByWatermark": 0,
                "numShufflePartitions": 200,
                "numStateStoreInstances": 200,
                "customMetrics": {
                    "loadedMapCacheHitCount": 1000,
                    "loadedMapCacheMissCount": 0,
                    "numDroppedDuplicateRows": 0,
                    "stateOnCurrentVersionSizeBytes": 24000
                }
            }
        ],
        "sources": [
            {
                "description": "MemoryStream[id#2L,ts#3]",
                "startOffset": 2,
                "endOffset": 2,
                "latestOffset": 2,
                "numInputRows": 0,
                "inputRowsPerSecond": 0.0,
                "processedRowsPerSecond": 0.0
            }
        ],
        "sink": {
            "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
            "numOutputRows": 0
        }
    }
}

I really don't understand the output of this code. Could somebody explain how exactly this works? It is not intuitive at all. Why are there 6 microbatches when I add only 3 of them to MemoryStream? The outputs of these microbatches are even more of a mystery.

I get the output of the first microbatch, it takes the first record, and all the other records in that microbatch it deduplicates by id. Also, each record whose timestamp - 20 seconds is higher than the current watermark, updates the current watermark.

I don't understand all the other output, 6 microbatches and the data that is written out.

Does anybody know how this works?

I've read this article (and understood it): https://www.waitingforcode.com/apache-spark-structured-streaming/what-new-apache-spark-3.5.0-structured-streaming/read#onQueryIdle

And also read this: https://issues.apache.org/jira/browse/SPARK-42931

Including the pdf that is there with an example(I understood the examples gives in the pdf). And I still don't get why my code works the way it works.

Also, this code works extremely slow when using dropDuplicatesWithinWatermark, why is that? When you remove dropDuplicatesWithinWatermark it works really fast.


Solution

  • Let's say we have this case class:

    case class Event(id: Long, ts: Timestamp) 
    

    And these microbatches:

    // microbatch 1
    Event(1, Timestamp.valueOf("2023-06-10 10:20:40"))
    Event(1, Timestamp.valueOf("2023-06-10 10:20:30"))
    Event(2, Timestamp.valueOf("2023-06-10 10:20:50"))
    Event(3, Timestamp.valueOf("2023-06-10 10:20:45"))
    
    // microbatch 2
    Event(1, Timestamp.valueOf("2023-06-10 10:22:40"))
    Event(1, Timestamp.valueOf("2023-06-10 10:20:10"))
    Event(4, Timestamp.valueOf("2023-06-10 10:21:50"))
    Event(5, Timestamp.valueOf("2023-06-10 10:21:45"))
    
    // microbatch 3
    Event(1, Timestamp.valueOf("2023-06-10 10:24:40"))
    

    Processing goes like this:

    // watermark for microbatch 1 = 1970-01-01 00:00:00
    Event(1, Timestamp.valueOf("2023-06-10 10:20:40"))  // put this in state {id: 1, expiresAt 10:21:00}, future watermark value (FWM) = 10:20:20 (date omitted from here on)
    Event(1, Timestamp.valueOf("2023-06-10 10:20:30"))  // deduplicated, watermark didn't advance
    Event(2, Timestamp.valueOf("2023-06-10 10:20:50"))  // put this in state {id: 2, expiresAt: 10:21:10}, FWM = 10:20:30
    Event(3, Timestamp.valueOf("2023-06-10 10:20:45"))  // put this in state {id: 3, expiresAt: 10:21:05}, watermark didn't advance
    
    output: 
       Event(1, Timestamp.valueOf("2023-06-10 10:20:40"))
       Event(2, Timestamp.valueOf("2023-06-10 10:20:50"))
       Event(3, Timestamp.valueOf("2023-06-10 10:20:45"))
    
    
    
    
    
    // watermark for microbatch 2 = 10:20:30 (value taken from the last FWM in the previous microbatch)
    // before executing this microbatch we also look at state and check if any value's expiresAt is <= watermark, there's no such values in the state, so we don't remove anything
    Event(1, Timestamp.valueOf("2023-06-10 10:22:40")) // deduplicated, FWM = 10:22:20
    Event(1, Timestamp.valueOf("2023-06-10 10:20:10")) // deduplicated, watermark didn't advance
    Event(4, Timestamp.valueOf("2023-06-10 10:21:50")) // put this in state {id: 4, expiresAt: 10:22:10} watermark didn't advance
    Event(5, Timestamp.valueOf("2023-06-10 10:21:45")) // put this in state {id: 5, expiresAt: 10:22:05}, watermark didn't advance
    
    output:
        Event(4, Timestamp.valueOf("2023-06-10 10:21:50"))
        Event(5, Timestamp.valueOf("2023-06-10 10:21:45"))
    
    
    
    
    
    // watermark = 10:22:20 (from FWM in the previous microbatch)
    // in the state we have 
    // {id: 1, expiresAt 10:21:00}  -- remove from state because expiresAt <= watermark
    // {id: 2, expiresAt: 10:21:10} -- remove from state because expiresAt <= watermark
    // {id: 3, expiresAt: 10:21:05} -- remove from state because expiresAt <= watermark 
    // {id: 4, expiresAt: 10:22:10} -- remove from state because expiresAt <= watermark
    // {id: 5, expiresAt: 10:22:05} -- remove from state because expiresAt <= watermark
    
    // watermark for microbatch 3 = 10:22:20
    Event(1, Timestamp.valueOf("2023-06-10 10:24:40")) // put this in state {id: 1, expiresAt: 10:25:00}, FWM = 10:24:20
    output:
        Event(1, Timestamp.valueOf("2023-06-10 10:24:40"))