Search code examples
javaapache-flinkflink-streamingrocksdbflink-cep

Configure RocksDB in flink 1.13


I have read about EmbeddedRocksDBStateBackend in Flink 1.13 version but has size limitations, so I want to keep the current configuration of my previous Flink version 1.11, but the point is that this way of configuring the RocksDB is deprecated (new RocksDBStateBackend("path", true);).

I have tried with the new configuration using EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true)) and I have this error:

java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

What is the best way of configuring the RocksDB state backend for flink 1.13 programmatically from Java?


Solution

  • In Flink 1.13 we reorganized the state backends because the old way had resulted in many misunderstandings about how things work. So these two concerns were decoupled:

    1. Where your working state is stored (the state backend). (In the case of RocksDB, it should be configured to use the fastest available local disk.)
    2. Where checkpoints are stored (the checkpoint storage). In most cases, this should be a distributed filesystem.

    With the old API, the fact that two different filesystems are involved in the case of RocksDB was obscured by the way the checkpointing path was passed to the RocksDBStateBackend constructor. So that bit of configuration has been moved elsewhere (see below).

    This table shows the relationships between the legacy state backends and the new ones (in combination with checkpoint storage):

    Legacy State Backend New State Backend + Checkpoint Storage
    MemoryStateBackend HashMapStateBackend + JobManagerCheckpointStorage
    FsStateBackend HashMapStateBackend + FileSystemCheckpointStorage
    RocksDBStateBackend EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage

    In your case you want to use the EmbeddedRocksDBStateBackend with FileSystemCheckpointStorage. The problem you are currently having is that you are using in-memory checkpoint storage (JobManagerCheckpointStorage) with RocksDB, which severely limits how much state can be checkpointed.

    You can fix this by either specifying a checkpoint directory inflink-conf.yaml

    state.backend: rocksdb
    state.checkpoints.dir: file:///checkpoint-dir/
    
    # Optional, Flink will automatically default to FileSystemCheckpointStorage
    # when a checkpoint directory is specified.
    state.checkpoint-storage: filesystem
    
    

    or in your code

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new EmbeddedRocksDBStateBackend());
    env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
    
    // If you manually passed FsStateBackend into the RocksDBStateBackend constructor
    // to specify advanced checkpointing configurations such as write buffer size,
    // you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
    

    See the docs on Migrating from Legacy Backends for complete details.