I am running a Flink job that reads data from Kafka, processes it into a Flink Row object, and writes it to an Iceberg Sink. To deploy new code changes, I restart the job from the latest savepoint, which is committed at the Kafka source. This savepoint stores the most recent Kafka offset read by the job, allowing the subsequent run to continue from the previous offset, ensuring transparent failure recovery. Recently, I encountered a scenario where data was lost in the final output after a job restart. Here's what happened:
Is there a workaround for this problem, or is it an inherent limitation of the Iceberg Sink?
Leaving an update here for others who might run into this in the future as this was rather tricky to debug. We are currently using Iceberg 0.11.x on Flink 1.11, there is a bug that was fixed in #2745 to explicitly set operator UID in the Iceberg sink.
If you don't explicitly set the UID and your job graph changes while deploying a change with a checkpoint, Flink won't be able to restore the previous Flink sink operator state, specifically the committer operator state. However, you can circumvent this issue by using the --allowNonRestoredState
flag. During the restore process, Flink uses the sink state to verify whether checkpointed files were actually committed. Using --allowNonRestoredState
can lead to data loss because the Iceberg commit might have failed in the last completed checkpoint.