Search code examples
apache-flinkflink-streamingflink-cep

flink missing state value on k8s - Resume job while jobmanager/taskmanager crashes


While the flink job cluster(deployment/pod) running on kubernetes, we delete the jobmanager and taskmanager(kubectl delete pod XXX). We found the state are missing from previous pod which rocksDB and checkpoint file path is mounted from PVC after the pod is running and working fine. Any suggestion to restore the state after the pod is running? I double checked the code. I found the checkpoint is not enabled. Is it the root cause the job can' resume?

environment setting is below

RocksDBStateBackend backend = new RocksDBStateBackend(checkPointDataUri + "/checkpoint",true);
        backend.setDbStoragePath(checkPointDataUri + "/RocksDB");
        backend.setNumberOfTransferingThreads(1);

        // add state backend
        env.setStateBackend((StateBackend)backend);

Can we enable the checkpoint like below?

    env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

below is the restarting log.

2020-06-09 06:48:11,921 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,962 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,941 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,921 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,963 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,961 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,965 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,942 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}
2020-06-09 06:48:11,981 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Configuring application-defined state backend with job/cluster config
2020-06-09 06:48:11,944 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/data/ss-kpi-ewfj8/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=[/opt/flink/data/ss-kpi-ewfj8/RocksDB], enableIncrementalCheckpointing=TRUE, numberOfTransferingThreads=1}

Solution

  • It doesn't make sense for RocksDB and the checkpoints to be stored in the same file system. RocksDB should be using the fastest available local file system -- kubernetes ephemeral storage is fine. And the checkpoints must be stored someone durable, in some sort of distributed file system.