Search code examples
apache-flinkflink-streaming

What would happen if a key is not seen but rocksdb has state about that key?


Let's say I have process function like this one (with rocksdb state backend):

public class Test extends KeyedProcessFunction<...>
{
    private transient ValueState<Integer> ...;
    ...

    @Override
    public void open(Configuration parameters) throws Exception
    {

        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .cleanupInRocksdbCompactFilter(1000)
                .build();

        ValueStateDescriptor<Integer> testDescr = new ValueStateDescriptor<Integer>(
                "test",
                TypeInformation.of(Integer.class)
        );
        testDescr.enableTimeToLive(ttlConfig);
        ...
    }
}

kafkaSource.keyby(object -> object.getKey()).process(new Test()))...;

Assume that this is a unbounded stream application Let's say I have seen the key called "orange", but only one time(or just assume that process function called once for the key "orange"), and assume that there will no key called "orange". In that case key "orange" will stay in the rocksdb forever?


Solution

  • The state for the inactive key "orange" will be removed from RocksDB during the first RocksDB compaction that occurs after 10 minutes have elapsed since the state for that key was created (because the TTL configuration builder was configured with a 10 minute TTL timeout). Until then the state will linger in RocksDB, but because you have configured StateVisibility.NeverReturnExpired Flink will pretend it's not there should you try to access it.