I have read about EmbeddedRocksDBStateBackend
in Flink 1.13 version but has size limitations, so I want to keep the current configuration of my previous Flink version 1.11, but the point is that this way of configuring the RocksDB is deprecated (new RocksDBStateBackend("path", true);
).
I have tried with the new configuration using EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true))
and I have this error:
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
What is the best way of configuring the RocksDB state backend for flink 1.13 programmatically from Java?
In Flink 1.13 we reorganized the state backends because the old way had resulted in many misunderstandings about how things work. So these two concerns were decoupled:
With the old API, the fact that two different filesystems are involved in the case of RocksDB was obscured by the way the checkpointing path was passed to the RocksDBStateBackend
constructor. So that bit of configuration has been moved elsewhere (see below).
This table shows the relationships between the legacy state backends and the new ones (in combination with checkpoint storage):
Legacy State Backend | New State Backend + Checkpoint Storage |
---|---|
MemoryStateBackend |
HashMapStateBackend + JobManagerCheckpointStorage |
FsStateBackend |
HashMapStateBackend + FileSystemCheckpointStorage |
RocksDBStateBackend |
EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage |
In your case you want to use the EmbeddedRocksDBStateBackend
with FileSystemCheckpointStorage
. The problem you are currently having is that you are using in-memory checkpoint storage (JobManagerCheckpointStorage
) with RocksDB, which severely limits how much state can be checkpointed.
You can fix this by either specifying a checkpoint directory inflink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
or in your code
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
See the docs on Migrating from Legacy Backends for complete details.