Search code examples
apache-flinkflink-streaming

Apache Flink - How to Combine AssignerWithPeriodicWatermark and AssignerWithPunctuatedWatermark?


Usecase: using EventTime and extracted timestamp from records from Kafka.

myConsumer.assignTimestampsAndWatermarks(new MyTimestampEmitter());
...
stream
        .keyBy("platform")
        .window(TumblingEventTimeWindows 5 mins))
        .aggregate(AggFunc(), WindowFunc())
        .countWindowAll(size)
        .apply(someFunc)
        .addSink(someSink);

What I want: Flink extracts timestamp and emits watermark for each record for an initial interval (e.g. 20 seconds), then it can periodically emits watermark (e.g. each 10s).

Reason: If I used PeriodicWatermark, at the beginning Flink will emit watermark only after some interval and the count in my 1st window of 5 mins is wrong - much larger than the count in the subsequent windows. I had a workaround setting setAutoWatermarkInterval to 100ms but this is more than necessary.

Currently, I must use AssignerWithPeriodicWatermark or AssignerWithPunctuatedWatermark. How can i implement this approach of a combining strategy? Thanks.


Solution

  • Before doing something unusual with your watermark generator, I would double-check that you've correctly diagnosed the situation. By and large, event-time windows should behave deterministically, and always produce the same results if presented with the same input. If you are getting results for the first window that vary depending on how often watermarks are being produced, that indicates that you probably have late events that are being dropped when the watermarks arrive more frequently, and are able to be included when the watermarks are less frequent. Perhaps your watermarks aren't correctly accounting for the actual degree of out-of-orderness your events are experiencing? Or perhaps your watermarks are based on System.currentTimeMillis(), rather than the event timestamps?

    Also, it's normal for the first time window to be different than the others, because time windows are aligned to the epoch, rather than the first event. Of course, this has the effect that the first window covers a shorter period of time than all of the others, so you should expect it to contain fewer events, not more.

    Setting setAutoWatermarkInterval to 100ms is a perfectly normal thing to do. But if you really want to avoid this, you might consider an AssignerWithPunctuatedWatermarks that initially returns a watermark for every event, and then after a suitable interval, returns watermarks less often.

    In a punctuated watermark assigner, both the extractTimestamp and checkAndGetNextWatermark methods are called for every event. You can use some transient (non-flink) state in the assigner to keep track of either the time of the first event, or to count events, and use that information in checkAndGetNextWatermark to eventually back off and stop producing watermarks for every event (by sometimes returning null from checkAndGetNextWatermark, rather than a Watermark). Your application will always revert back to generating watermarks for every event whenever it is restarted.

    This will not yield an assigner with all of the characteristics of periodic and punctuated assigners, it's simply an adaptive punctuated assigner.