There are a lot of late events in my Flink job so set allowedLateness() to 10mins (using TumblingEventTimeWindows and a complex AggregateFunction runs on every window)
Seems the aggregation happens on every late event but I'd like to fire less frequently.
You could implement a custom Trigger with whatever behavior you desire.
If you look at the implementation of EventTimeTrigger
, the default trigger for tumbling event time windows,
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
you'll see that whenever an event is assigned to the window after the stream's watermark has reached or surpassed the window's end, then the trigger returns FIRE. This is why every late event causes another firing.
An alternative would be to have no allowed lateness, but instead collect the late events into their own stream (using a side output), and then process the late events independently.