Search code examples
cassandraapache-flinkflink-streaming

Checkpointing with Cassandra write


When checkpointing is used in a Flink job that writes to Cassandra and when this write fails due to a connection issue, the job fails and restarts after a certain time interval.

Where does this job start from when a record fails? Does it pick the next record to process or does it reset the offset and try to reprocess the failed record?

My checkpointing configuration looks like this,

try{
        env.setStateBackend(new RocksDBStateBackend(props.getFlinkCheckpointDataUri(), true));
        env.enableCheckpointing(10000, EXACTLY_ONCE); //10 seconds

        CheckpointConfig config = env.getCheckpointConfig();
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    }
catch(Exception e){
        System.out.println("Failed to prepare stream execution environment");
    }

Solution

  • Flink guarantees at-least-once delivery for Cassandra sink (exactly-once if your update requests to C* instance are idempotent; meaning updates can be applied multiple times without changing the result) if checkpointing is enabled [ref]. In other words, if record(s) failed to execute, the checkpoint with the snapshot that contains these records will not be committed. As a result, the records of the failed checkpoint will be retried completely.

    This works since Cassandra sink has a checkpoint committer that stores additional information about completed checkpoints in some resource. This information is used to prevent a full replay of the last completed checkpoint in case of a failure [ref].