Search code examples
apache-kafkaapache-flinkflink-streaming

flink with checkpoint doesn't die after kafka disconnection


My flink streaming job (with checkpoint) gets data from a source kafka and print it (very simple job).
If I kill the source kafka,

what I expect : flink job will die (because flink will lose the source)
What happend : flink job doesn't die retrying to connect to the source kafka

I want my flink job is dead after disconnection.
Even I set restart strategy, flink job doesn't die.
How can I kill my flink job after kafka disconnection?

FYI)

  • My flink version : 1.14.4
  • Checkpoint setting (in scala)

...
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(TimeUnit.HOURS.toMillis(1))
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)
env.getConfig.setUseSnapshotCompression(true)
...

Thanks in advance!


Solution

  • The checkpointing settings aren't relevant in this case, because they are specifically tied to making checkpoints. So only in case a checkpoint fails, these settings become relevant.

    In your use case, there will be some time that passes before Flink can detect that the Kafka brokers are no longer available. Flink will not stop immediately, unless at that point Flink tries to connect to Kafka and fails. In case you don't want Flink to restart in case of a failure, then you'll need to set the restart-strategy to none. See https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/task_failure_recovery/#no-restart-strategy