I'm using Flink application to store the keyed state onto a pvc mounted on a kubernetes pod. The pvc disk mounted is UltraSSD_LRS on azure. The configuration for the flink job is as below:
state.backend: rocksdb
state.backend.rocksdb.localdir: "/data/flink/state/local
state.checkpoints.dir: abfs://XXXXXXX/flink-checkpointing/
inside the code I'm also setting the dbStorage path as shown:
EmbeddedRocksDBStateBackend embeddedRocksDb = new EmbeddedRocksDBStateBackend(true);
embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");
env.setStateBackend(embeddedRocksDb);
env.enableCheckpointing(5000);
The disk is getting mounted properly and the data is also getting written but after sometime, the application throws a rocksdb Exeception.
Caused by: org.apache.flink.util.SerializedThrowable: org.rocksdb.RocksDBException: while link file to /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/chk-8.tmp/000014.sst: /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/db/000014.sst: Operation not supported
at org.rocksdb.Checkpoint.createCheckpoint(Native Method) ~[flink-dist-1.17.1.jar:1.17.1]
at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:170) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:156) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:76) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:593) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:715) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:350) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244) ~[flink-dist-1.17.1.jar:1.17.1]
... 23 more
I'm not sure what the problem is, but I would start by moving all of the configuration out of the code. That's not necessary, but it would be cleaner.
In particular, these two items are both configuring the same thing, but don't set the same value, so that's confusing:
state.backend.rocksdb.localdir: "/data/flink/state/local
embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");
The other piece of configuration is enabling checkpointing, which could be done via
execution.checkpointing.interval: 5000
Another factor to consider is that the EmbeddedRocksDBStateBackend
doesn't need a persistent volume -- it can use ephemeral storage, since Flink relies on checkpoints for recovery, and not the local disk.