I want to keep a simple value for all states in the KeyedProcessFunction
like below;
class StateC() extends KeyedProcessFunction[Long, A, B] {
var timestamp: Timestamp = _
override def open(parameters: Configuration): Unit = {
timestamp = assignCurrentHour()
}
override def processElement(item: A, ... ): Unit = {
val currentHour = now.truncateToHour()
if (currentHour.after(timestamp)) {
timestamp = assignCurrentHour()
}
....
}
}
I just want to find that am i in the new hour or not. For that i keep timestamp
variable.The value of the timestamp
variable is common for all keys in this TaskManager. So I do not need to state for each key.
In this case, timestamp
variable will be updated when any event processed by this task manager
in the new hour, right?
Could it be concurrent modification on the timestamp
variable?
The implementation you proposed looks fine (but keep in mind I'm not aware of all of your requirements). Each parallel instance of the KeyedProcessFunction
will have its own version of the timestamp, and each will update its timestamp as soon as an event is processed in the new hour. A KeyedProcessFunction
is single threaded: you don't have to worry about concurrent updates. (The onTimer
method is synchronized with the others; there's nothing to worry about there.)