Search code examples
apache-flinkflink-streaming

Flink common state for all keys in the KeyedProcessFunction


I want to keep a simple value for all states in the KeyedProcessFunction like below;

class StateC() extends KeyedProcessFunction[Long, A, B] {
  var timestamp: Timestamp = _

  override def open(parameters: Configuration): Unit = {
    timestamp = assignCurrentHour()
  }

 override def processElement(item: A, ... ): Unit = {
    val currentHour = now.truncateToHour()
    if (currentHour.after(timestamp)) { 
      timestamp = assignCurrentHour()
    }
   .... 
 }
}

I just want to find that am i in the new hour or not. For that i keep timestamp variable.The value of the timestamp variable is common for all keys in this TaskManager. So I do not need to state for each key.

In this case, timestamp variable will be updated when any event processed by this task manager in the new hour, right?

Could it be concurrent modification on the timestamp variable?


Solution

  • The implementation you proposed looks fine (but keep in mind I'm not aware of all of your requirements). Each parallel instance of the KeyedProcessFunction will have its own version of the timestamp, and each will update its timestamp as soon as an event is processed in the new hour. A KeyedProcessFunction is single threaded: you don't have to worry about concurrent updates. (The onTimer method is synchronized with the others; there's nothing to worry about there.)