Search code examples
apache-flinkflink-streamingflink-cepflink-sql

Checkpointing in Flink is not working with CoFlatMapFunction


Hi i am trying to do checkpointing in one of my flink module in which i am using CoFlatMapFunction to combine to streams if i comment out the CoFlatMapFunction checkpointing is working if uncomment again its not working. i updated the Checkpointing as this documentation in flink website in which it says for iterative streams there is an extra attribute added to force the checkpoint event after doing that also its not working please find below for the checkpoint settings

StateBackend stateBackend = new RocksDBStateBackend(path, true);

//env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);

env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,true);

 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

 env.getCheckpointConfig().setCheckpointTimeout(120000);
 
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
 env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

 env.setStateBackend(stateBackend);

I can see on of the task status finished but i am unable to see the logs since

enter image description here


Solution

  • I believe the reason for this is FLINK-2491: checkpointing only works if all operators/tasks are still running.

    You should replace the source that is injecting some data from a Collection with some other source that won't just instantly transition to being finished, perhaps a custom source that keeps the source alive once it runs out of data to emit, but doing nothing.