If a Flink application is starting back up after a failure or is updated, are class variables that are not explcitly part of KeyedState or OperatorState persisted?
For example, the BoundedOutOfOrdernessGenerator described in Flink's documentation has a currentMaxTimestamp variable. If a Flink application is updated, will the value in currentMaxTimestamp be lost, or does that get written to the savepoint created before the application is updated?
The real reason for this is I would like to implement a custom watermark generator (similar to this) that switches to processing time when generating watermarks if a source has been idle for too long. However, I am hoping to detect that the application is coming back online after an update or failure based on the class variables resetting to their original default values (say Long.MIN_VALUE in the example from the link I provided above). This way, I could ensure that the watermark generator does not mistake an application update that took five minutes as a source being idle for five minutes.
Additionally, does Flink restart each watermark generator operator if an application is updated, even if no changes were made to the watermark generator?
Only state that is explicitly managed by Flink is persisted -- so yes, the value in currentMaxTimestamp
is lost when restoring from a snapshot. The current watermarks are also not included in snapshots.
What I think you could do -- though I haven't tried it -- would be to have your watermark generator implement the CheckpointedFunction
interface. You can then implement these two methods:
public void snapshotState(FunctionSnapshotContext context)
public void initializeState(FunctionInitializationContext context)
In the initializeState
method you have access to context.isRestored()
which lets you know if you are restarting from a snapshot.