Search code examples
apache-flinkflink-streamingflink-cep

Flink CepOperator


I am reading the source code of CepOperator from Flink CEP and have a question about the following code snippet:

public void processElement(StreamRecord<IN> element) throws Exception {
        if (isProcessingTime) {
            ... ...
        } else {

            long timestamp = element.getTimestamp();
            IN value = element.getValue();

        
            if (timestamp > lastWatermark) {
            
                saveRegisterWatermarkTimer();

                bufferEvent(value, timestamp);

            } else if (lateDataOutputTag != null) {
                output.collect(lateDataOutputTag, element);
            } else {
                numLateRecordsDropped.inc();
            }
        }
    } 

I don't understand why every time receiving a new element, saveRegisterWatermarkTimer() gets called? here is the source code:

private void saveRegisterWatermarkTimer() {
    long currentWatermark = timerService.currentWatermark();
    // protect against overflow
    if (currentWatermark + 1 > currentWatermark) {
        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
    }
}

it almost always registers a new event-time timer. Doesn't it create too many timers?

Thanks for more explanation.


Solution

  • Registering a timer for currentWatermark + 1 is somewhat idiomatic, and can be used whenever you want to be aware of every arriving watermark (or in other words, every time the event time clock is advanced). Timers in Flink are automatically deduplicated: for any pair of (key, timestamp) there can be at most one timer, so there's no risk this will create too many timers.