Below is the code from here that maintains the checkpoint:
class EnrichmentStream {
private val checkpointsDir = "file://${System.getProperty("user.dir")}/checkpoints/"
private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"
companion object {
@JvmStatic
fun main(args: Array<String>) {
EnrichmentStream().runStream()
}
}
fun runStream() {
val environment = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(Configuration())
environment.parallelism = 3
// Checkpoint Configurations
environment.enableCheckpointing(5000)
environment.checkpointConfig.minPauseBetweenCheckpoints = 100
environment.checkpointConfig.setCheckpointStorage(checkpointsDir)
val stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
environment.stateBackend = stateBackend
environment.checkpointConfig.externalizedCheckpointCleanup =
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// Configure Restart Strategy
environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))
val tableEnvironment = StreamTableEnvironment.create(environment)
// Run some SQL queries to check the existing Catalogs, Databases and Tables
tableEnvironment
.executeSql("SHOW CATALOGS")
.print()
tableEnvironment
.executeSql("SHOW DATABASES")
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql(Queries.CREATE_SENSORS_TABLE)
.print()
tableEnvironment
.executeSql(Queries.CREATE_READINGS_TABLE)
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql(Queries.JOIN_SENSOR_READINGS_WITH_INFO_QUERY)
.print()
}
}
Does flink application with statebackend need checkpoint?
Yes, you need checkpointing. Without it, the state will be lost in the event of a failure.
However, the rocksDBStateDir
and checkpointsDir
should be in different file systems. The rocksDBStateDir
should be in the fastest available local file system. This is where the working state is kept, and it doesn't matter if this disk is lost when failures occur, since the state will be restored from the latest checkpoint. On the other hand, checkpointsDir
should be in a distributed file system, to ensure that failure recovery is always possible.