I want to count the unique access for each day using spark structured streaming, so I use the following code
.dropDuplicates("uuid")
and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.
So is there a perfect solution?
After a few days effort I finally find out the way myself.
While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.