Search code examples
javastreamapache-flinkflink-streaming

Flink - Can statebackend allow purging old data in RocksDB table?


In the below code:

class EnrichmentStream {
    private val checkpointsDir  = "file://${System.getProperty("user.dir")}/checkpoints/"
    private val rocksDBStateDir = "file://${System.getProperty("user.dir")}/state/rocksdb/"

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            EnrichmentStream().runStream()
        }
    }

    fun runStream() {
        val environment = StreamExecutionEnvironment
            .createLocalEnvironmentWithWebUI(Configuration())

        environment.parallelism = 3

        // Checkpoint Configurations
        environment.enableCheckpointing(5000)
        environment.checkpointConfig.minPauseBetweenCheckpoints = 100
        environment.checkpointConfig.setCheckpointStorage(checkpointsDir)

        val stateBackend = EmbeddedRocksDBStateBackend()
        stateBackend.setDbStoragePath(rocksDBStateDir)
        environment.stateBackend = stateBackend

        environment.checkpointConfig.externalizedCheckpointCleanup =
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION

        // Configure Restart Strategy
        environment.restartStrategy = RestartStrategies.fixedDelayRestart(5, Time.seconds(5))

        val tableEnvironment = StreamTableEnvironment.create(environment)

        // Run some SQL queries to check the existing Catalogs, Databases and Tables
        tableEnvironment
            .executeSql("SHOW CATALOGS")
            .print()

        tableEnvironment
            .executeSql("SHOW DATABASES")
            .print()

        tableEnvironment
            .executeSql("SHOW TABLES")
            .print()

        tableEnvironment
            .executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
            .print()

    }
}

Can Flink statebackend delete old records(more than 2 hours old) in RocksDB table(Orders)? so that table(Orders) has records whose life is less than 2 hours old, at any point of time. We use this data for comparison.

if yes, what is the syntax for CREATE TABLE to inform that interval?


Solution

  • What you can do is to set table.exec.state.ttl to two hours -- see the docs for the specifics.

    The effect of this will be to expire all of the state for this job after two hours. There's (currently) no way to specific the state TTL on a per-table basis.

    Update:

    In many cases it's not necessary to do this sort of state management yourself. If you construct your queries so that the temporal constraints are part of the query, then Flink SQL will automatically clear state that is no longer useful.

    Queries that can do this include interval and temporal joins, windows, over aggregations, and match_recognize.