I have this code:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Trigger}
import java.sql.Timestamp
case class Event(id: Long, ts: Timestamp)
object DropDuplicatesWithinWatermarkMain extends App {
val spark = SparkSession
.builder()
.appName("dropDuplicatesWithinWatermarkTest")
.master("local[*]")
.getOrCreate()
spark.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
println(event.json)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
}
})
spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
val memoryStream1 = MemoryStream[Event](1, spark.sqlContext, Some(5))
val dfStr = memoryStream1.toDF()
val streamingQuery = dfStr
.withWatermark("ts", "20 seconds")
.dropDuplicatesWithinWatermark("id")
.writeStream
.trigger(Trigger.ProcessingTime("1 second"))
.format("console")
.outputMode(OutputMode.Append())
.start()
memoryStream1.addData(Seq(
Event(1, Timestamp.valueOf("2023-01-01 10:10:10")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:11")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:40")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:30")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:29")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:28")),
))
streamingQuery.processAllAvailable()
memoryStream1.addData(Seq(
Event(1, Timestamp.valueOf("2023-01-01 10:10:21")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:22")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:23")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:24")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:56")),
Event(1, Timestamp.valueOf("2023-01-01 10:10:57")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:00")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:20")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:21")),
))
streamingQuery.processAllAvailable()
memoryStream1.addData(Seq(
Event(1, Timestamp.valueOf("2023-01-01 10:11:21")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:22")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:23")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:24")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:56")),
Event(1, Timestamp.valueOf("2023-01-01 10:11:57")),
Event(1, Timestamp.valueOf("2023-01-01 10:12:02")),
Event(1, Timestamp.valueOf("2023-01-01 10:12:01")),
Event(1, Timestamp.valueOf("2023-01-01 10:12:03"))
))
streamingQuery.processAllAvailable()
}
This is the output it produces:
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------------+
| id| ts|
+---+-------------------+
| 1|2023-01-01 10: 10: 10|
+---+-------------------+
{
"progress": {
"id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
"runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
"name": null,
"timestamp": "2023-11-19T20:45:54.001Z",
"batchId": 0,
"numInputRows": 9,
"inputRowsPerSecond": 17.374517374517374,
"processedRowsPerSecond": 0.356477997385828,
"durationMs": {
"addBatch": 24501,
"commitOffsets": 317,
"getBatch": 2,
"latestOffset": 0,
"queryPlanning": 214,
"triggerExecution": 25247,
"walCommit": 207
},
"eventTime": {
"avg": "2023-01-01T07:10:25.777Z",
"max": "2023-01-01T07:10:40.000Z",
"min": "2023-01-01T07:10:10.000Z",
"watermark": "1970-01-01T00:00:00.000Z"
},
"stateOperators": [
{
"operatorName": "dedupeWithinWatermark",
"numRowsTotal": 1,
"numRowsUpdated": 1,
"allUpdatesTimeMs": 633,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 156,
"commitTimeMs": 532110,
"memoryUsedBytes": 48256,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 200,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"numDroppedDuplicateRows": 8,
"stateOnCurrentVersionSizeBytes": 19456
}
}
],
"sources": [
{
"description": "MemoryStream[id#2L,ts#3]",
"startOffset": null,
"endOffset": 0,
"latestOffset": 0,
"numInputRows": 9,
"inputRowsPerSecond": 17.374517374517374,
"processedRowsPerSecond": 0.356477997385828
}
],
"sink": {
"description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
"numOutputRows": 1
}
}
}
-------------------------------------------
Batch: 1
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+
{
"progress": {
"id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
"runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
"name": null,
"timestamp": "2023-11-19T20:46:19.256Z",
"batchId": 1,
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0,
"durationMs": {
"addBatch": 35702,
"commitOffsets": 389,
"getBatch": 0,
"latestOffset": 0,
"queryPlanning": 7,
"triggerExecution": 36452,
"walCommit": 349
},
"eventTime": {
"watermark": "2023-01-01T07:10:20.000Z"
},
"stateOperators": [
{
"operatorName": "dedupeWithinWatermark",
"numRowsTotal": 1,
"numRowsUpdated": 0,
"allUpdatesTimeMs": 0,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 841879,
"memoryUsedBytes": 85088,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 200,
"customMetrics": {
"loadedMapCacheHitCount": 200,
"loadedMapCacheMissCount": 0,
"numDroppedDuplicateRows": 0,
"stateOnCurrentVersionSizeBytes": 24176
}
}
],
"sources": [
{
"description": "MemoryStream[id#2L,ts#3]",
"startOffset": 0,
"endOffset": 0,
"latestOffset": 0,
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0
}
],
"sink": {
"description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
"numOutputRows": 0
}
}
}
-------------------------------------------
Batch: 2
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+
{
"progress": {
"id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
"runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
"name": null,
"timestamp": "2023-11-19T20:46:56.000Z",
"batchId": 2,
"numInputRows": 9,
"inputRowsPerSecond": 30.82191780821918,
"processedRowsPerSecond": 0.19603999215840032,
"durationMs": {
"addBatch": 45082,
"commitOffsets": 440,
"getBatch": 0,
"latestOffset": 0,
"queryPlanning": 7,
"triggerExecution": 45909,
"walCommit": 380
},
"eventTime": {
"avg": "2023-01-01T07:10:47.111Z",
"max": "2023-01-01T07:11:21.000Z",
"min": "2023-01-01T07:10:21.000Z",
"watermark": "2023-01-01T07:10:20.000Z"
},
"stateOperators": [
{
"operatorName": "dedupeWithinWatermark",
"numRowsTotal": 1,
"numRowsUpdated": 0,
"allUpdatesTimeMs": 0,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 1040896,
"memoryUsedBytes": 89808,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 200,
"customMetrics": {
"loadedMapCacheHitCount": 400,
"loadedMapCacheMissCount": 0,
"numDroppedDuplicateRows": 9,
"stateOnCurrentVersionSizeBytes": 24176
}
}
],
"sources": [
{
"description": "MemoryStream[id#2L,ts#3]",
"startOffset": 0,
"endOffset": 1,
"latestOffset": 1,
"numInputRows": 9,
"inputRowsPerSecond": 30.82191780821918,
"processedRowsPerSecond": 0.19603999215840032
}
],
"sink": {
"description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
"numOutputRows": 0
}
}
}
-------------------------------------------
Batch: 3
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+
{
"progress": {
"id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
"runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
"name": null,
"timestamp": "2023-11-19T20:47:41.909Z",
"batchId": 3,
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0,
"durationMs": {
"addBatch": 51214,
"commitOffsets": 446,
"getBatch": 0,
"latestOffset": 0,
"queryPlanning": 4,
"triggerExecution": 52093,
"walCommit": 427
},
"eventTime": {
"watermark": "2023-01-01T07:11:01.000Z"
},
"stateOperators": [
{
"operatorName": "dedupeWithinWatermark",
"numRowsTotal": 0,
"numRowsUpdated": 0,
"allUpdatesTimeMs": 0,
"numRowsRemoved": 1,
"allRemovalsTimeMs": 1,
"commitTimeMs": 1185689,
"memoryUsedBytes": 89776,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 200,
"customMetrics": {
"loadedMapCacheHitCount": 600,
"loadedMapCacheMissCount": 0,
"numDroppedDuplicateRows": 0,
"stateOnCurrentVersionSizeBytes": 24000
}
}
],
"sources": [
{
"description": "MemoryStream[id#2L,ts#3]",
"startOffset": 1,
"endOffset": 1,
"latestOffset": 1,
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0
}
],
"sink": {
"description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
"numOutputRows": 0
}
}
}
-------------------------------------------
Batch: 4
-------------------------------------------
+---+-------------------+
| id| ts|
+---+-------------------+
| 1|2023-01-01 10: 11: 21|
+---+-------------------+
{
"progress": {
"id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
"runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
"name": null,
"timestamp": "2023-11-19T20:48:35.001Z",
"batchId": 4,
"numInputRows": 9,
"inputRowsPerSecond": 9.01803607214429,
"processedRowsPerSecond": 0.15629341483745485,
"durationMs": {
"addBatch": 56634,
"commitOffsets": 496,
"getBatch": 0,
"latestOffset": 0,
"queryPlanning": 4,
"triggerExecution": 57584,
"walCommit": 449
},
"eventTime": {
"avg": "2023-01-01T07:11:43.222Z",
"max": "2023-01-01T07:12:03.000Z",
"min": "2023-01-01T07:11:21.000Z",
"watermark": "2023-01-01T07:11:01.000Z"
},
"stateOperators": [
{
"operatorName": "dedupeWithinWatermark",
"numRowsTotal": 1,
"numRowsUpdated": 1,
"allUpdatesTimeMs": 1,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 1308628,
"memoryUsedBytes": 89776,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 200,
"customMetrics": {
"loadedMapCacheHitCount": 800,
"loadedMapCacheMissCount": 0,
"numDroppedDuplicateRows": 8,
"stateOnCurrentVersionSizeBytes": 24176
}
}
],
"sources": [
{
"description": "MemoryStream[id#2L,ts#3]",
"startOffset": 1,
"endOffset": 2,
"latestOffset": 2,
"numInputRows": 9,
"inputRowsPerSecond": 9.01803607214429,
"processedRowsPerSecond": 0.15629341483745485
}
],
"sink": {
"description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
"numOutputRows": 1
}
}
}
-------------------------------------------
Batch: 5
-------------------------------------------
+---+---+
| id| ts|
+---+---+
+---+---+
{
"progress": {
"id": "554921ec-cfdf-4c66-85eb-156ae3e828bc",
"runId": "b13e986a-ccdf-4aac-b9fb-a6ad34845fbc",
"name": null,
"timestamp": "2023-11-19T20:49:32.586Z",
"batchId": 5,
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0,
"durationMs": {
"addBatch": 63828,
"commitOffsets": 511,
"getBatch": 0,
"latestOffset": 0,
"queryPlanning": 6,
"triggerExecution": 64812,
"walCommit": 467
},
"eventTime": {
"watermark": "2023-01-01T07:11:43.000Z"
},
"stateOperators": [
{
"operatorName": "dedupeWithinWatermark",
"numRowsTotal": 0,
"numRowsUpdated": 0,
"allUpdatesTimeMs": 0,
"numRowsRemoved": 1,
"allRemovalsTimeMs": 1,
"commitTimeMs": 1479275,
"memoryUsedBytes": 89776,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 200,
"numStateStoreInstances": 200,
"customMetrics": {
"loadedMapCacheHitCount": 1000,
"loadedMapCacheMissCount": 0,
"numDroppedDuplicateRows": 0,
"stateOnCurrentVersionSizeBytes": 24000
}
}
],
"sources": [
{
"description": "MemoryStream[id#2L,ts#3]",
"startOffset": 2,
"endOffset": 2,
"latestOffset": 2,
"numInputRows": 0,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 0.0
}
],
"sink": {
"description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@71aa1612",
"numOutputRows": 0
}
}
}
I really don't understand the output of this code. Could somebody explain how exactly this works? It is not intuitive at all. Why are there 6 microbatches when I add only 3 of them to MemoryStream
? The outputs of these microbatches are even more of a mystery.
I get the output of the first microbatch, it takes the first record, and all the other records in that microbatch it deduplicates by id
. Also, each record whose timestamp - 20 seconds
is higher than the current watermark, updates the current watermark.
I don't understand all the other output, 6 microbatches and the data that is written out.
Does anybody know how this works?
I've read this article (and understood it): https://www.waitingforcode.com/apache-spark-structured-streaming/what-new-apache-spark-3.5.0-structured-streaming/read#onQueryIdle
And also read this: https://issues.apache.org/jira/browse/SPARK-42931
Including the pdf that is there with an example(I understood the examples gives in the pdf). And I still don't get why my code works the way it works.
Also, this code works extremely slow when using dropDuplicatesWithinWatermark
, why is that? When you remove dropDuplicatesWithinWatermark
it works really fast.
Let's say we have this case class:
case class Event(id: Long, ts: Timestamp)
And these microbatches:
// microbatch 1
Event(1, Timestamp.valueOf("2023-06-10 10:20:40"))
Event(1, Timestamp.valueOf("2023-06-10 10:20:30"))
Event(2, Timestamp.valueOf("2023-06-10 10:20:50"))
Event(3, Timestamp.valueOf("2023-06-10 10:20:45"))
// microbatch 2
Event(1, Timestamp.valueOf("2023-06-10 10:22:40"))
Event(1, Timestamp.valueOf("2023-06-10 10:20:10"))
Event(4, Timestamp.valueOf("2023-06-10 10:21:50"))
Event(5, Timestamp.valueOf("2023-06-10 10:21:45"))
// microbatch 3
Event(1, Timestamp.valueOf("2023-06-10 10:24:40"))
Processing goes like this:
// watermark for microbatch 1 = 1970-01-01 00:00:00
Event(1, Timestamp.valueOf("2023-06-10 10:20:40")) // put this in state {id: 1, expiresAt 10:21:00}, future watermark value (FWM) = 10:20:20 (date omitted from here on)
Event(1, Timestamp.valueOf("2023-06-10 10:20:30")) // deduplicated, watermark didn't advance
Event(2, Timestamp.valueOf("2023-06-10 10:20:50")) // put this in state {id: 2, expiresAt: 10:21:10}, FWM = 10:20:30
Event(3, Timestamp.valueOf("2023-06-10 10:20:45")) // put this in state {id: 3, expiresAt: 10:21:05}, watermark didn't advance
output:
Event(1, Timestamp.valueOf("2023-06-10 10:20:40"))
Event(2, Timestamp.valueOf("2023-06-10 10:20:50"))
Event(3, Timestamp.valueOf("2023-06-10 10:20:45"))
// watermark for microbatch 2 = 10:20:30 (value taken from the last FWM in the previous microbatch)
// before executing this microbatch we also look at state and check if any value's expiresAt is <= watermark, there's no such values in the state, so we don't remove anything
Event(1, Timestamp.valueOf("2023-06-10 10:22:40")) // deduplicated, FWM = 10:22:20
Event(1, Timestamp.valueOf("2023-06-10 10:20:10")) // deduplicated, watermark didn't advance
Event(4, Timestamp.valueOf("2023-06-10 10:21:50")) // put this in state {id: 4, expiresAt: 10:22:10} watermark didn't advance
Event(5, Timestamp.valueOf("2023-06-10 10:21:45")) // put this in state {id: 5, expiresAt: 10:22:05}, watermark didn't advance
output:
Event(4, Timestamp.valueOf("2023-06-10 10:21:50"))
Event(5, Timestamp.valueOf("2023-06-10 10:21:45"))
// watermark = 10:22:20 (from FWM in the previous microbatch)
// in the state we have
// {id: 1, expiresAt 10:21:00} -- remove from state because expiresAt <= watermark
// {id: 2, expiresAt: 10:21:10} -- remove from state because expiresAt <= watermark
// {id: 3, expiresAt: 10:21:05} -- remove from state because expiresAt <= watermark
// {id: 4, expiresAt: 10:22:10} -- remove from state because expiresAt <= watermark
// {id: 5, expiresAt: 10:22:05} -- remove from state because expiresAt <= watermark
// watermark for microbatch 3 = 10:22:20
Event(1, Timestamp.valueOf("2023-06-10 10:24:40")) // put this in state {id: 1, expiresAt: 10:25:00}, FWM = 10:24:20
output:
Event(1, Timestamp.valueOf("2023-06-10 10:24:40"))