When checkpointing is used in a Flink job that writes to Cassandra and when this write fails due to a connection issue, the job fails and restarts after a certain time interval.
Where does this job start from when a record fails? Does it pick the next record to process or does it reset the offset and try to reprocess the failed record?
My checkpointing configuration looks like this,
try{
env.setStateBackend(new RocksDBStateBackend(props.getFlinkCheckpointDataUri(), true));
env.enableCheckpointing(10000, EXACTLY_ONCE); //10 seconds
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
catch(Exception e){
System.out.println("Failed to prepare stream execution environment");
}
Flink guarantees at-least-once delivery for Cassandra sink (exactly-once if your update requests to C* instance are idempotent; meaning updates can be applied multiple times without changing the result) if checkpointing is enabled [ref]. In other words, if record(s) failed to execute, the checkpoint with the snapshot that contains these records will not be committed. As a result, the records of the failed checkpoint will be retried completely.
This works since Cassandra sink has a checkpoint committer that stores additional information about completed checkpoints in some resource. This information is used to prevent a full replay of the last completed checkpoint in case of a failure [ref].