Search code examples
apache-flinkrocksdb

Can we update a state's TTL value?


We have a topology that uses states (ValueState and ListState) with TTL(StateTtlConfig) because we can not use Timers (We would generate hundred of millions of timers per day, and it does scale : a savepoint/checkpoint would take hours to be generated and might even get stuck while running).

However we need to update the value of the TTL at runtime depending of the type of some incoming events and other logic. Is this alright to recreate a new state with a new StateTtlConfig (and updated TTL time) and copy the values from "old" to "new in the processElement1() and processElement2() methods of a CoProcessFunction (instead of once in the open() like we usually do) ?

I guess the "old" state would be garbage collected (?).

Would this solution scale? be performant? generate any issue? anything bad?


Solution

  • I think your approach can work with the state re-creation in runtime to some extent but it is brittle. The problem, I can see, is that the old state meta information can linger somewhere depending on backend implementation.

    For Heap (FS) backend, eventually the checkpoint/savepoint will have no records for the expired old state but the meta info can linger in memory while the job is running. It will go away if the job is restarted.

    For RocksDB, the column family of the old state can linger. Moreover, the background cleanup runs only during compaction. If the table is too small, like the part which is in memory, this part (maybe even a bit on disk) will linger. It will go away after restart if cleanup on full snapshot is active (not for incremental checkpoints).

    All in all, it depends on how often you have to create the new state and restart your job from savepoint/checkpoint.

    I created a ticket to document what can be changed in TTL config and when, so check some details in the issue.