I was reading the the Flink example CountWithTimestamp and below is a code snippet from the example:
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
My question is that if I remove the if statment timestamp == result.lastModified + 60000
(collect stmt not touched) in the onTimer, and instead replace it by another if statment if(ctx.timestamp < current.lastModified + 60000) { deleteEventTimeTimer(current.lastModified + 60000)}
in the begining of processElement, would the semnatics of the program be the same? any preference of one version over the other in case of same semantics?
You are correct to think that the implementation that deletes the timer has the same semantics. And in fact I recently changed the example used in our training materials to do just that, as I prefer this approach. The reason I find it preferable is that all of the complex business logic is then in one place (in processElement
), and whenever onTimer
is called, you know exactly what to do, no questions asked. Plus, it's more performant, as there are fewer timers to checkpoint and eventually trigger.
This example was written for the docs back before timers could be deleted, and hasn't been updated.
You can find the reworked example I mentioned in these slides -- https://training.ververica.com/decks/process-function/ -- once you get past the registration page.
FWIW, I also recently reworked the reference solution to the corresponding training exercise along the same lines: https://github.com/apache/flink-training/tree/master/long-ride-alerts.