Search code examples
azureapache-flinkrocksdb

Exception while writing to azure UltraSSD LRS to store flink keyed state using rocksdb


I'm using Flink application to store the keyed state onto a pvc mounted on a kubernetes pod. The pvc disk mounted is UltraSSD_LRS on azure. The configuration for the flink job is as below:

  state.backend: rocksdb
  state.backend.rocksdb.localdir: "/data/flink/state/local
  state.checkpoints.dir: abfs://XXXXXXX/flink-checkpointing/

inside the code I'm also setting the dbStorage path as shown:

EmbeddedRocksDBStateBackend embeddedRocksDb = new EmbeddedRocksDBStateBackend(true);
embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");
env.setStateBackend(embeddedRocksDb);
env.enableCheckpointing(5000);

The disk is getting mounted properly and the data is also getting written but after sometime, the application throws a rocksdb Exeception.

Caused by: org.apache.flink.util.SerializedThrowable: org.rocksdb.RocksDBException: while link file to /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/chk-8.tmp/000014.sst: /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/db/000014.sst: Operation not supported
at org.rocksdb.Checkpoint.createCheckpoint(Native Method) ~[flink-dist-1.17.1.jar:1.17.1]
at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:170) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:156) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:76) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:593) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:715) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:350) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244) ~[flink-dist-1.17.1.jar:1.17.1]
... 23 more

Solution

  • I'm not sure what the problem is, but I would start by moving all of the configuration out of the code. That's not necessary, but it would be cleaner.

    In particular, these two items are both configuring the same thing, but don't set the same value, so that's confusing:

    state.backend.rocksdb.localdir: "/data/flink/state/local
    
    embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");
    

    The other piece of configuration is enabling checkpointing, which could be done via

    execution.checkpointing.interval: 5000
    

    Another factor to consider is that the EmbeddedRocksDBStateBackend doesn't need a persistent volume -- it can use ephemeral storage, since Flink relies on checkpoints for recovery, and not the local disk.