In Flink job I have a KeyedProcessFunction
.
I have implemented a watermark strategy
val wmStrategy: WatermarkStrategy<MyInput> =
WatermarkStrategy.forMonotonousTimestamps<MyInput>()
.withTimestampAssigner { event: MyInput, _: Long -> event.getTimestampEvent() }
and then i apply it to my source data:
mysource.assignTimestampsAndWatermarks(wmStrategy)
When processElement
is called a timer may be registered ctx.timerService().registerEventTimeTimer(timerWakeUpInstant.toEpochMilli())
and after that the ValueState
is updated. Update is successful.
The next time processElement
is called, valueState.value()
returns null
instead of the last updated value.
No clear()
is called explicitly on the value state.
The timer is never triggered.
At the moment, I'm testing in a 'clean' environment, reading from a text file with data referring to only a key, and with parallelism = 1
running into my IDE.
Can you help me? Why the state is nullified? And why timer is not triggered?
I have tried myself: OnTimer
is not called until the Function
that has registered the timer receives a message that advences the watermark.
With event-time timers, the onTimer(...) method is called when the current watermark is advanced up to or beyond the timestamp of the time
The "current" watermark actually refers to the operator, and not the job. This was misleading for me, as i thought it was centralized.
Looking at some code sample in the documentation we can find a useful comment that may give us a hint:
//trigger event time timers by advancing the event time of the operator with a watermark