I ran into a code bug in my Kafka-Flink application where I needed to redeploy the whole Flink application. I know about checkpoints and savepoints, but since my application had to restart, it will lose all the flink's internal checkpoints, and I hadn't created any savepoints.
However, my commit.offsets.on.checkpoint
flag is true, so even if my internal flink checkpoints are lost, the kafka broker would have the last committed offset.
So in this case, will my restarted flink application read from that kafka broker's offset, or will it restart from all the way back in the kafka queue, because it had lost all its internal checkpoints?
If Flink is restarting from a checkpoint or savepoint, then Flink will use the offsets in that snapshot. Otherwise it will use the starting position configured in the KafkaSource or FlinkKafkaConsumer. If you haven't explicitly configured a starting position, then the KafkaSource will start from the earliest offset, and I'm not sure what the default was for the FlinkKafkaConsumer.
If you want to restart from a checkpoint or savepoint while ignoring the saved offsets (in order to pick up the rest of the saved state), then change the UID on the Kafka source operator, configure a specific starting position, and set the --allowNonRestoredState
flag (since there will be saved offsets associated with the old uid that can't be restored).