Search code examples
apache-flinkflink-streaming

Flink timer not triggered


In Flink job I have a KeyedProcessFunction.

I have implemented a watermark strategy

val wmStrategy: WatermarkStrategy<MyInput> =
        WatermarkStrategy.forMonotonousTimestamps<MyInput>()
            .withTimestampAssigner { event: MyInput, _: Long -> event.getTimestampEvent() }

and then i apply it to my source data:

mysource.assignTimestampsAndWatermarks(wmStrategy)

When processElement is called a timer may be registered ctx.timerService().registerEventTimeTimer(timerWakeUpInstant.toEpochMilli()) and after that the ValueState is updated. Update is successful.

The next time processElement is called, valueState.value() returns null instead of the last updated value. No clear() is called explicitly on the value state. The timer is never triggered.

At the moment, I'm testing in a 'clean' environment, reading from a text file with data referring to only a key, and with parallelism = 1 running into my IDE.

Can you help me? Why the state is nullified? And why timer is not triggered?


Solution

  • I have tried myself: OnTimer is not called until the Function that has registered the timer receives a message that advences the watermark.

    With event-time timers, the onTimer(...) method is called when the current watermark is advanced up to or beyond the timestamp of the time

    The "current" watermark actually refers to the operator, and not the job. This was misleading for me, as i thought it was centralized.

    Looking at some code sample in the documentation we can find a useful comment that may give us a hint:

    //trigger event time timers by advancing the event time of the operator with a watermark