Search code examples
javaapache-flinkflink-streaming

Does statebackend need checkpoint?


Below is the code from here that maintains the checkpoint:

class EnrichmentStream {
    private val checkpointsDir  = "file://${System.getProperty("user.dir")}/checkpoints/"
    private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            EnrichmentStream().runStream()
        }
    }

    fun runStream() {
        val environment = StreamExecutionEnvironment
            .createLocalEnvironmentWithWebUI(Configuration())

        environment.parallelism = 3

        // Checkpoint Configurations
        environment.enableCheckpointing(5000)
        environment.checkpointConfig.minPauseBetweenCheckpoints = 100
        environment.checkpointConfig.setCheckpointStorage(checkpointsDir)

        val stateBackend = EmbeddedRocksDBStateBackend()
        stateBackend.setDbStoragePath(rocksDBStateDir)
        environment.stateBackend = stateBackend

        environment.checkpointConfig.externalizedCheckpointCleanup =
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION

        // Configure Restart Strategy
        environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))

        val tableEnvironment = StreamTableEnvironment.create(environment)

        // Run some SQL queries to check the existing Catalogs, Databases and Tables
        tableEnvironment
            .executeSql("SHOW CATALOGS")
            .print()

        tableEnvironment
            .executeSql("SHOW DATABASES")
            .print()

        tableEnvironment
            .executeSql("SHOW TABLES")
            .print()

        tableEnvironment
            .executeSql(Queries.CREATE_SENSORS_TABLE)
            .print()

        tableEnvironment
            .executeSql(Queries.CREATE_READINGS_TABLE)
            .print()

        tableEnvironment
            .executeSql("SHOW TABLES")
            .print()

        tableEnvironment
            .executeSql(Queries.JOIN_SENSOR_READINGS_WITH_INFO_QUERY)
            .print()
    }
}

Does flink application with statebackend need checkpoint?


Solution

  • Yes, you need checkpointing. Without it, the state will be lost in the event of a failure.

    However, the rocksDBStateDir and checkpointsDir should be in different file systems. The rocksDBStateDir should be in the fastest available local file system. This is where the working state is kept, and it doesn't matter if this disk is lost when failures occur, since the state will be restored from the latest checkpoint. On the other hand, checkpointsDir should be in a distributed file system, to ensure that failure recovery is always possible.