I have a value state that stores some computed data that will be frequently accessed and quite expensive to compute, so I am using a value state to cache that information in a keyed process function.
From the Flink doc, I think that setting a TTL does not explicitly clean but it is done so lazily. This presents a problem as I have a lot of data in state and causing the job to run out of memory.
Is there a way I can explicitly free up the state? Something like if the state has not been read for more than 10 minutes, free it from ram.
You can refer to the section on "cleanup of expired state": https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state
For heap state backend, you can set an incremental cleanup. Not quite a fixed timeout though.
Another option is to trigger cleanup of some state entries incrementally. The trigger can be a callback from each state access or/and each record processing. If this cleanup strategy is active for certain state, The storage backend keeps a lazy global iterator for this state over all its entries. Every time incremental cleanup is triggered, the iterator is advanced. The traversed state entries are checked and expired ones are cleaned up.
This feature can be configured in StateTtlConfig:
import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupIncrementally(10, true) .build
For RocksDB state backend, you can configure cleanup in RocksDB compaction filter. It has it's own caveats too.
If the RocksDB state backend is used, a Flink specific compaction filter will be called for the background cleanup. RocksDB periodically runs asynchronous compactions to merge state updates and reduce storage. Flink compaction filter checks expiration timestamp of state entries with TTL and excludes expired values.
This feature can be configured in StateTtlConfig:
import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupInRocksdbCompactFilter(1000) .build