Imagine a case, the input is a filename, and we want to use flink RichFlatMapFunction
to update the state and output lines from the file (each file contains say 10k lines). I'm wondering where should I update the state to make sure exactly once delivery. Here are 2 solutions:
// solution 1
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
state.update(line)
flinkState.update(state)
out.collect(line)
}
}
}
// solution 2
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
flinkState.update(state)
out.collect(line)
}
state.update(line)
}
}
It terms of correctness, it doesn't make any difference. Checkpointing never occurs during an invocation of a user function (like your RichFlatMapFunction
), so a checkpoint will either reflect the state before processing an event passed to the flatMap
method, or after.
In terms of performance, solution 2 is much better.