Search code examples
apache-flinkflink-streaming

How to implement a Trigger in flink that buffers until a timeout and triggers when timeout elapses?


How to implement a Trigger in Flink that buffers until a timeout and triggers when timeout elapses?

I want the trigger to be registered if there is atleast one element in the window and then buffer until a second and trigger when one seconds elapses. If there are no elements in the window then the trigger wont register itself so I dont expect to see any output.

I dont want the trigger to produce lot of traffic every second regardless of elements being present in the window or not. on other hand say there is only one element in the window I dont want it sit there and wait until the watermark or forever. Instead I want to have a timeout so that I can see that one element atleast after a second.

Does ProcessingTimeTrigger.create() do this? if so, what is different between ProcessingTimeTrigger.create() vs CountinousProcessingTimeTrigger ?


Solution

  • A normal one-second-long processing time window will give you a window that contains all of the events that occur during one second, for any second in which there is at least one event. But this window will not be aligned to the first event; it will be aligned to the time-of-day clock. So if, for example, the first event in a window occurs half way through a given second, then that window will only include events for the 500 msec following the first one.

    A ProcessingTimeTrigger fires once at the end of the window. A CountinousProcessingTimeTrigger fires repeatedly at some specified rate.

    To get precisely the semantics you've described, you'll need a custom Trigger. You could do something similar to this OneSecondIntervalTrigger example, except that you'll want to switch from using event time to processing time, and only trigger once, rather than repeatedly.

    That will leave you with something like this:

    public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
    
        @Override
        public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
            // firstSeen will be false if not set yet
            ValueState<Boolean> firstSeen = ctx.getPartitionedState(
                new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
    
            // register initial timer only for first element
            if (firstSeen.value() == null) {
                // FIRE the window 1000 msec after the first event
                long now = ctx.getCurrentProcessingTime();
                ctx.registerProcessingTimeTimer(now + 1000);
                fireSeen.update(true);
            }
            // Continue. Do not evaluate window now
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
            // Continue. We don't use event time timers
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
            // Evaluate the window now
            return TriggerResult.FIRE_AND_PURGE;
        }
    
        @Override
        public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
            // Clear trigger state
            ValueState<Boolean> firstSeen = ctx.getPartitionedState(
                new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
            firstSeen.clear();
        }
    }