Search code examples
javatestingapache-flinkflink-streaming

How to get Flink Operator State TTL to work


I am not able to get the operator state TTL to work. I have set the TTL to 5 minutes so after that time, I am expecting the state to be cleared. When I notice the state is empty because of the TTL expiry, it should refresh the state

public static class TestFilter extends RichFilterFunction<A>
      implements CheckpointedFunction {
    final Helper helper;
    final ConfigFetcher configFetcher;
    private final ListStateDescriptor<Integer> listStateDescriptor =
        new ListStateDescriptor<>(
            "state_descriptor", TypeInformation.of(new TypeHint<>() {}));

    @SuppressWarnings("NullAway.Init")
    private transient ListState<Integer> listState;

    TestFilter(
        Helper helper,
        ConfigFetcher configFetcher) {
      this.helper = helper;
      this.configFetcher = configFetcher;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
      StateTtlConfig stateTTLConfig =
          StateTtlConfig.newBuilder(Time.minutes(5))
              .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
              .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
              .build();
      stateDescriptor.enableTimeToLive(stateTTLConfig);
      state =          
       context.getOperatorStateStore().getListState(stateDescriptor);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) {}

    @Override
    public boolean filter(A element) throws Exception {
        Set<Integer> ids = new HashSet<>();
        if (listState.get() == null) {
          ids = configFetcher.getIds();
          listState.update(new ArrayList<>(ids));
        } else {
          listState.get().forEach(ids::add);
          if (ids.isEmpty()) {
            ids = configFetcher.getIds();
            listState.update(new ArrayList<>(ids));
          }
        }
        return ids.contains(element)
    
    }

When I test this, I notice that after the 5 min, the state is not being cleared/refreshed. Does anyone know why?


Solution

  • I believe that state TTL is only supported for keyed state. The implementation doesn't handle operator state.

    This is why, in the documentation, State TTL is a subsection underneath Using Keyed State: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl