Search code examples
apache-flink

How to add a custom WatermarkGenerator to a WatermarkStrategy


I'm using Apache Flink 1.11 and want to use some custom WatermarkGenerator.

With the Watermarkstrategy, you can add built-in WatermarkGenerators with ease:

WatermarkStrategy.forMonotonousTimestamps();

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

In the documentation, you can see how to implement a custom Watermarkgenerator, for example a Periodic WatermarkGenerator:

public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

How can i add this custom BoundedOutOfOrdernessGenerator to a Watermarkstrategy?


Solution

  • A WatermarkStrategy is the thing you need to define. So assuming you have some class MyWatermarkGenerator that implements WatermarkGenerator<MyEvent>, then you'd do something like:

            WatermarkStrategy<WatermarkedRecord> ws = (ctx -> new MyWatermarkGenerator());
    
        ...
        DataStream<MyEvent> ds = xxx;
        ds.assignTimestampsAndWatermarks(ws);
    

    Note that unless your source is setting up timestamps for you (e.g. Kafka record timestamps), you'll want to add a timestamp extractor to your WatermarkStrategy, as in...

            WatermarkStrategy<WatermarkedRecord> ws = (ctx -> new MyWatermarkGenerator());
            ws = ws.withTimestampAssigner((r, ts) -> r.getTimestamp());