Search code examples
javaapache-flinkflink-streaming

How to implement a Flink Event Time Trigger that emits after no events recieved for X minutes


I'm struggling a bit understanding how Flink Triggers work. My datastream contains events with a sessionId that I aggregated based on that sessionId. Each session will contain a Started and a Ended event however some times the Ended event will be lost.

In order to handle this I've set up a Trigger that will emit the aggregated session whenever the ended event is processed. But in the case that no events arrive from that session for 2 minutes I want to emit whatever we have aggregated so far (our apps that send the events send heartbeats every minute so if we don't get any events the session is considered lost).

I've set up the following trigger function:

public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
    private final long sessionTimeout;
    private long lastSetTimer;

    // Max session length set to 1 day
    public static final long MAX_SESSION_LENGTH = 1000l * 86400l;

    // End session events
    private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
            .add("Playback.Aborted")
            .add("Playback.Completed")
            .add("Playback.Error")
            .add("Playback.StartAirplay")
            .add("Playback.StartCasting")
            .build();

    public EventTimeProcessingTimeTrigger(long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
        ctx.registerProcessingTimeTimer(lastSetTimer);

        if(endSession.contains(element.get(Field.EVENT_TYPE))) {
            return TriggerResult.FIRE_AND_PURGE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE_AND_PURGE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(lastSetTimer);
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
    }
}

In order to set watermarks for the events I use the watermarks set by the apps since appEventTime might not be the same as wallClock on the server. I extract watermarks like this:

DataStream<HashMap> playerEvents = env
                .addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
                .name("Read player events from Kafka")
                .uid("Read player events from Kafka")
                .map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
                .name("Map Json to HashMap")
                .uid("Map Json to HashMap")
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
                {
                    @Override
                    public long extractTimestamp(HashMap element)
                    {
                        long timestamp = 0L;
                        Object timestampAsObject = (Object) element.get("CanonicalTime");
                        timestamp = (long)timestampAsObject;
                        return timestamp;
                    }
                })
                .name("Add CanonicalTime as timestamp")
                .uid("Add CanonicalTime as timestamp");

Now what I find strange is that when I run the code in debug and set a breakpoint in the clear function of the Trigger it constantly gets called. Even when no FIRE_AND_PURGE point is reached in the Trigger. So it feels like I've completely misunderstood how the Trigger is supposed to work. And that my implementation is not at all doing what I think it's doing.

I guess my question is, when should clear be called by the Trigger? And is this the correct way to implement a combined EventTimeTrigger and ProcessingTimeTrigger?

Thankful for all the help I can get.

UPDATE 1: (2020-05-29)

In order to provide some more information about how things are setup. I set up my environment as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

So I use EventTime for the entire stream. I then create the windows like this:

DataStream<PlayerSession> playerSessions = sideEvents
                .keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
                .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                .trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
                .aggregate(new SessionAggregator())
                .name("Aggregate events into sessions")
                .uid("Aggregate events into sessions");

Solution

  • This situation is complex. I hesitate to predict exactly what this code will do, but I can explain some of what’s going on.

    Point 1: you have set the time characteristic to event time, arranged for timestamps and watermarks, and implemented an onEventTime callback in your Trigger. But nowhere are you creating an event time timer. Unless I've missed something, nothing is actually using event time or watermarks. You haven't implemented an event time trigger, and I would not expect that onEventTime will ever be called.

    Point 2: Your trigger doesn't need to call clear. Flink takes care of calling clear on triggers as part of purging windows.

    Point 3: Your trigger is trying to fire and purge the window repeatedly, which doesn't seem right. I say this because you are creating a new processing time timer for every element, and when each timer fires, you are firing and purging the window. You can fire the window as often as you like, but you can only purge the window once, after which it is gone.

    Point 4: Session windows are a special kind of window, known as merging windows. When sessions merge (which happens all the time, as events arrive), their triggers are merged, and one of them gets cleared. This is why you see clear being called so frequently.

    Suggestion: since you have once-a-minute keepalives, and intend to close sessions after 2 minutes of inactivity, it seems like you could set the session gap to be 2 minutes, and that would avoid a fair bit of what's making things so complex. Let the session windows do what they're designed to do.

    Assuming that will work, then you could simple extend Flink's ProcessingTimeTrigger and override its onElement method to do this:

    @Override
    public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    
        if (endSession.contains(element.get(Field.EVENT_TYPE))) {
            return TriggerResult.FIRE_AND_PURGE;
        }
    
        return super(element, timestamp, window, ctx);
    }
    

    In this fashion the window will be triggered after two minutes of inactivity, or by an explicit session-ending event.

    You should be able to simply inherit the rest of ProcessingTimeTrigger's behavior.

    If you want to use event time, then use EventTimeTrigger as the superclass, and you'll have to find a way to make sure that your watermarks make progress even when the stream becomes idle. See this answer for how to handle that.