I'm using Apache Flink 1.11 and want to use some custom WatermarkGenerator.
With the Watermarkstrategy, you can add built-in WatermarkGenerators with ease:
WatermarkStrategy.forMonotonousTimestamps();
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
In the documentation, you can see how to implement a custom Watermarkgenerator, for example a Periodic WatermarkGenerator:
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
How can i add this custom BoundedOutOfOrdernessGenerator to a Watermarkstrategy?
A WatermarkStrategy is the thing you need to define. So assuming you have some class MyWatermarkGenerator
that implements WatermarkGenerator<MyEvent>
, then you'd do something like:
WatermarkStrategy<WatermarkedRecord> ws = (ctx -> new MyWatermarkGenerator());
...
DataStream<MyEvent> ds = xxx;
ds.assignTimestampsAndWatermarks(ws);
Note that unless your source is setting up timestamps for you (e.g. Kafka record timestamps), you'll want to add a timestamp extractor to your WatermarkStrategy, as in...
WatermarkStrategy<WatermarkedRecord> ws = (ctx -> new MyWatermarkGenerator());
ws = ws.withTimestampAssigner((r, ts) -> r.getTimestamp());