Search code examples
apache-flinkflink-streaming

In apache flink, should we update state per collect or per input?


Imagine a case, the input is a filename, and we want to use flink RichFlatMapFunction to update the state and output lines from the file (each file contains say 10k lines). I'm wondering where should I update the state to make sure exactly once delivery. Here are 2 solutions:

// solution 1
class MyOp extends RichFlatMapFunction {
 ...
 def flatMap(filename: String, out: Collector[String]): Unit = {
    val state = Option(flinkState.value()).getOrElse(defaultState)
    for (line <- read(filename)) {
      state.update(line)
      flinkState.update(state)
      out.collect(line)
    }
 }
}
// solution 2
class MyOp extends RichFlatMapFunction {
 ...
 def flatMap(filename: String, out: Collector[String]): Unit = {
    val state = Option(flinkState.value()).getOrElse(defaultState)
    for (line <- read(filename)) {
      flinkState.update(state)
      out.collect(line)
    }
    state.update(line)
 }
}

Solution

  • It terms of correctness, it doesn't make any difference. Checkpointing never occurs during an invocation of a user function (like your RichFlatMapFunction), so a checkpoint will either reflect the state before processing an event passed to the flatMap method, or after.

    In terms of performance, solution 2 is much better.