Search code examples
apache-flinkflink-streaming

Are processElement1 and processElement2 from KeyedCoProcessFunction atomic w.r.t. state? or could they modify a shared state concurrently?


I have 2 streams, one data stream that just contains a flag to set ValueState passing=true/false, another control stream that adds a user to be notified to MapState. When passing changes from false to true, a notification is issued to the users present in MapState who have not yet been notified.

Heres how the state transitions

enter image description here

Here is the KeyedCoProcessFunction that handles this logic.

class TestKeyedCoProcessFunction extends KeyedCoProcessFunction[String, String, String, String] {

    @transient private var notified: MapState[String, Boolean] = _ // user:was_notified
    @transient private var passing: ValueState[java.lang.Boolean] = _ // issue notification when updated to passing=true 

    override def open(parameters: Configuration): Unit = {
      val notifiedDescriptor = new MapStateDescriptor("notified", Types.STRING, Types.BOOLEAN)
      notified = getRuntimeContext.getMapState(notifiedDescriptor)

      val passingDescriptor = new ValueStateDescriptor("passing", Types.BOOLEAN)
      passing = getRuntimeContext.getState(passingDescriptor)

      if (passing.value() == null) {
        passing.update(false)
      }
    }

    def addUser(user: String): Boolean = {
      if (notified.contains(user)) {
        false
      } else {
        notified.put(user, false)
        true
      }
    }

    def setPassing(newPassing: String): Boolean = {
      if (passing.value()) {
        if (newPassing == "true") {
          false
        } else {
          passing.update(false)
          true
        }
      } else {
        if (newPassing == "false") {
          false
        } else {
          passing.update(true)
          true
        }
      }
    }

    def notifyNotNotifiedUsers(collector: Collector[String]): Unit = {
      val keys = notified.keys().iterator()
      while (keys.hasNext) {
        val user = keys.next()
        val userNotified = notified.get(user)
        if (!userNotified) {
          collector.collect("Hey " + user + " passing=true")
          notified.put(user, true)
        }
      }
    }

    def setNotifiedFalseAll(): Unit = {
      val keys = notified.keys().iterator()
      while (keys.hasNext) {
        val user = keys.next()
        val userNotified = notified.get(user)
        if (userNotified) {
          notified.put(user, false)
        }
      }
    }

    override def processElement1(user: String,
                                 context: KeyedCoProcessFunction[String, String, String, String]#Context,
                                 collector: Collector[String]): Unit = {
      addUser(user)
      if (passing.value()) {
        notifyNotNotifiedUsers(collector)
      }
    }

    override def processElement2(newPassing: String,
                                 context: KeyedCoProcessFunction[String, String, String, String]#Context,
                                 collector: Collector[String]): Unit = {
      val modified = setPassing(newPassing)
      if (passing.value()) {
        notifyNotNotifiedUsers(collector)
      } else {
        if (modified) {
          setNotifiedFalseAll()
        }
      }
    }
  }

Is it possible for a race condition to occur in Flink where processElement1 and processElement2 are being executed simultaneously, for example

t+1 processElement2("true")
t+2 processElement2: setPassing("true")
t+3 processElement2: notifyNotNotifiedUsers() // starts iteration on MapState
t+4 processElement1("new_user")
t+5 processElement1: addUser(user) // adds user to MapState
t+6 processElement1: notifyNotNotifiedUsers() // starts another parallel iteration on MapState resulting in maybe missed/duplicate notification

Solution

  • There's no possibility of a race condition in any given instance of a KeyedCoProcessFunction, or in any of Flink's user function interfaces, for that matter. processElement1 and processElement2 can not be executed concurrently. onTimer is safe as well.