I am not able to get the operator state TTL to work. I have set the TTL to 5 minutes so after that time, I am expecting the state to be cleared. When I notice the state is empty because of the TTL expiry, it should refresh the state
public static class TestFilter extends RichFilterFunction<A>
implements CheckpointedFunction {
final Helper helper;
final ConfigFetcher configFetcher;
private final ListStateDescriptor<Integer> listStateDescriptor =
new ListStateDescriptor<>(
"state_descriptor", TypeInformation.of(new TypeHint<>() {}));
@SuppressWarnings("NullAway.Init")
private transient ListState<Integer> listState;
TestFilter(
Helper helper,
ConfigFetcher configFetcher) {
this.helper = helper;
this.configFetcher = configFetcher;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
StateTtlConfig stateTTLConfig =
StateTtlConfig.newBuilder(Time.minutes(5))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
stateDescriptor.enableTimeToLive(stateTTLConfig);
state =
context.getOperatorStateStore().getListState(stateDescriptor);
}
@Override
public void snapshotState(FunctionSnapshotContext context) {}
@Override
public boolean filter(A element) throws Exception {
Set<Integer> ids = new HashSet<>();
if (listState.get() == null) {
ids = configFetcher.getIds();
listState.update(new ArrayList<>(ids));
} else {
listState.get().forEach(ids::add);
if (ids.isEmpty()) {
ids = configFetcher.getIds();
listState.update(new ArrayList<>(ids));
}
}
return ids.contains(element)
}
When I test this, I notice that after the 5 min, the state is not being cleared/refreshed. Does anyone know why?
I believe that state TTL is only supported for keyed state. The implementation doesn't handle operator state.
This is why, in the documentation, State TTL is a subsection underneath Using Keyed State: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl