I have a Flink application that gathers events for a certain amount of time and then sinks them. I used apply function:
.window(TumblingEventTimeWindows.of(Time.minutes(30)))
.apply(windowFunction)
def apply(key: String, window: TimeWindow, input: Iterable[AggEvents], out: Collector[AggEvents]): Unit
Now I realized that the reduce function makes a lot more sense:
.window(TumblingEventTimeWindows.of(Time.minutes(30)))
.reduce(reduceFunction)
override def reduce(value1: AggEvents, value2: AggEvents): AggEvents
The application has been running for quite some time now, question is - will the change break it due to the difference in the snapshot structure?
Any change you make will need to be deployed with a stop and a start. When a Flink project is stopped for deploying a new change, a Snapshot/Savepoint is created. You have the option to restart the application with the last saved snapshot. This is the only way to ensure that the application will resume with in a state as close as possible to the time the application stopped.
While restarting with the previous Savepoint, several rules need to be followed for the state to be restored.
There are many more. This Flink docs link is the full list of how a Flink application topology affects the application migration.
if your application has several operators and state resumption is critical, my suggestion is you can test your change with deploying a new version with saved Snapshot. If the application cannot be restored, you can rollback and deploy the previous version with the saved Snapshot.
This question has a more in depth discussion of how the sink's state is restored from Checkpoints.
Apache Flink: Do sinks store items buffered from stream during checkpoint into checkpoint state?