Search code examples
apache-flink

Flink window operator checkpointing


I want to know how flink does the checkpoint of the window operator. How to ensure that it is exactly once when recovering? For example, saving the tuples in the current window and saving the progress of the current window processing. I want to know the detailed process of the window operator's checkpoint and recovery.


Solution

  • All of Flink's stateful operators participate in the same checkpointing mechanism. When instructed to do so by the checkpoint coordinator (part of the job manager), the task managers initiate a checkpoint in each parallel instance of every source operator. The sources checkpoint their offsets and insert a checkpoint barrier into the stream. This divides the stream into the parts before and after the checkpoint. The barriers flow through the graph, and each stateful operator checkpoints its state upon having processed the stream up to the checkpoint barrier. The details are described at the link shared by @bupt_ljy.

    Thus these checkpoints capture the entire state of the distributed pipeline, recording offsets into the input queues as well as the state throughout the job graph that has resulted from having ingested the data up to that point. When a failure occurs, the sources are rewound, the state is restored, and processing is resumed.

    Given that during recovery the sources are rewound and replayed, "exactly once" means that the state managed by Flink is affected exactly once, not that the stream elements are processed exactly once.

    There's nothing particularly special about windows in this regard. Depending on the type of window function being applied, a window's contents are kept in an element of managed ListState, ReducingState, AggregatingState, or FoldingState. As stream elements arrive and are being assigned to a window, they are appended, reduced, aggregated, or folded into that state. Other components of the window API, including Triggers and ProcessWindowFunctions, can have state that is checkpointed as well. For example, CountTrigger using ReducingState to keep track of how many elements have been assigned to the window, adding one to the count as each element is added to the window.

    In the case where the window function is a ProcessWindowFunction, all of the elements assigned to the window are saved in Flink state, and are passed in an Iterable to the ProcessWindowFunction when the window is triggered. That function iterates over the contents and produces a result. The internal state of the ProcessWindowFunction is not checkpointed; if the job fails during the execution of the ProcessWindowFunction the job will resume from the most recently completed checkpoint. This will involve rewinding back to a time before the window received the event that triggered the window firing (that event can't be included in the checkpoint because a checkpoint barrier following it can not have had its effect yet). Sooner or later the window will again reach the point where it is triggered and the ProcessWindowFunction will be called again -- with the same window contents it received the first time -- and hopefully this time it won't fail. (Note that I've ignored the case of processing-time windows, which do not behave deterministically.)

    When a ProcessWindowFunction uses managed/checkpointed state, it is used to remember things between firings, not within a single firing. For example, a window that allows late events might want to store the result previously reported, and then issue an update for each late event.