In the below code:
class EnrichmentStream {
private val checkpointsDir = "file://${System.getProperty("user.dir")}/checkpoints/"
private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"
companion object {
@JvmStatic
fun main(args: Array<String>) {
EnrichmentStream().runStream()
}
}
fun runStream() {
val environment = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(Configuration())
environment.parallelism = 3
// Checkpoint Configurations
environment.enableCheckpointing(5000)
environment.checkpointConfig.minPauseBetweenCheckpoints = 100
environment.checkpointConfig.setCheckpointStorage(checkpointsDir)
val stateBackend = EmbeddedRocksDBStateBackend()
stateBackend.setDbStoragePath(rocksDBStateDir)
environment.stateBackend = stateBackend
environment.checkpointConfig.externalizedCheckpointCleanup =
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// Configure Restart Strategy
environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))
val tableEnvironment = StreamTableEnvironment.create(environment)
// Run some SQL queries to check the existing Catalogs, Databases and Tables
tableEnvironment
.executeSql("SHOW CATALOGS")
.print()
tableEnvironment
.executeSql("SHOW DATABASES")
.print()
tableEnvironment
.executeSql("SHOW TABLES")
.print()
tableEnvironment
.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
.print()
}
}
Can Flink statebackend delete old records(more than 2 hours old) in RocksDB table(Orders
)? so that table(Orders
) has records whose life is less than 2 hours old, at any point of time. We use this data for comparison.
if yes, what is the syntax for CREATE TABLE
to inform that interval?
What you can do is to set table.exec.state.ttl
to two hours -- see the docs for the specifics.
The effect of this will be to expire all of the state for this job after two hours. There's (currently) no way to specific the state TTL on a per-table basis.
Update:
In many cases it's not necessary to do this sort of state management yourself. If you construct your queries so that the temporal constraints are part of the query, then Flink SQL will automatically clear state that is no longer useful.
Queries that can do this include interval and temporal joins, windows, over aggregations, and match_recognize.