Search code examples
apache-sparkduplicatesapache-spark-sqlout-of-memoryspark-structured-streaming

How to expire state of dropDuplicates in structured streaming to avoid OOM?


I want to count the unique access for each day using spark structured streaming, so I use the following code

.dropDuplicates("uuid")

and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:

.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")

but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.

So is there a perfect solution?


Solution

  • After a few days effort I finally find out the way myself.

    While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:

    .select(
        window($"timestamp", "1 day"),
        $"timestamp",
        $"uuid"
      )
    .withWatermark("window", "1 day")
    .dropDuplicates("uuid", "window")
    

    Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.