I have a custom trigger currently implemented that is supposed to trigger when there has been maxElements
processed or timeoutMs
has elapsed. Currently, the maxElements portion of the trigger works fine and triggers as expected, however the timeout has never been triggered.
In the Flink main job, I have also tried env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
, however this has not been helpful.
Can someone guide me in the right direction? I am lost and unsure where to proceed. Here is my implementation:
public class ElementCountOrTimeoutTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private long elementCount = 0;
private long lastTimestamp = Long.MIN_VALUE;
public ElementCountOrTimeoutTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = timestamp;
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("On processing time called");
System.out.println("Time: " + time);
System.out.println("Last timestamp: " + lastTimestamp);
System.out.println("Timeout: " + timeoutMs);
if (time >= lastTimestamp + timeoutMs) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = 0;
lastTimestamp = Long.MIN_VALUE;
}
public static <W extends Window> ElementCountOrTimeoutTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeoutTrigger<>(maxElements, timeoutMs);
}
}
Edit:
Incorporated a timer, however it is not respecting the delete timer calls:
public class ElementCountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private int elementCount = Integer.MIN_VALUE;
private long lastTimestamp = Long.MIN_VALUE;
private long lastTimerExpire = Long.MIN_VALUE;
public ElementCountOrTimeTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = ctx.getCurrentProcessingTime();
if (lastTimerExpire > lastTimestamp) {
ctx.deleteProcessingTimeTimer(lastTimerExpire);
System.out.println("Removed the timer due to new element for time: " + lastTimerExpire);
}
lastTimerExpire = lastTimestamp + timeoutMs;
ctx.registerProcessingTimeTimer(lastTimerExpire);
System.out.println("Registered timer until " + lastTimerExpire);
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("Processing time called for time: " + time + " and last timer expire: " + lastTimerExpire);
System.out.println("Firing and purging");
elementCount = Integer.MIN_VALUE;
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = Integer.MIN_VALUE;
ctx.deleteProcessingTimeTimer(lastTimerExpire);
}
public static <W extends Window> ElementCountOrTimeTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeTrigger<>(maxElements, timeoutMs);
}
}
The log it produces:
Registered timer until 1683405995456
Removed the timer due to new element for time: 1683405995456
Registered timer until 1683405997053
Removed the timer due to new element for time: 1683405997053
Registered timer until 1683405999054
Removed the timer due to new element for time: 1683405999054
Registered timer until 1683406001054
Removed the timer due to new element for time: 1683406001054
Registered timer until 1683406003054
Removed the timer due to new element for time: 1683406003054
Registered timer until 1683406005055
Processing time called for time: 1683405995456 and last timer expire: 1683406005055
Firing and purging
Processing time called for time: 1683405997053 and last timer expire: 1683406005055
Firing and purging
Removed the timer due to new element for time: 1683406005055
Registered timer until 1683406007055
Removed the timer due to new element for time: 1683406007055
Registered timer until 1683406009055
Processing time called for time: 1683405999054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406001054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406003054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406005055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406007055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406009055 and last timer expire: 1683406009055
Firing and purging
You haven't registered a processing time timer, so onProcessingTime
will never be called.
You can model the timeout part of your custom trigger on the built-in ProcessingTimeTrigger
, which you'll find here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
See also this answer, which provides a complete example of something very similar: https://stackoverflow.com/a/49895802/2000823