Search code examples
apache-flinkflink-streamingflink-cep

About TTL configuration for States in Flink


lets assume that I have this configuration for a descriptor and actions were taken from here:

ValueStateDescriptor<Event> descriptor = ...;

StateTtlConfig ttlConfigOneHourAndReturnExpire = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

descriptor.enableTimeToLive(ttlConfigOneHourAndReturnExpire);

/*after one hour when the state is expired*/
Event e = state.value(); (step 1 and 2)
e.count = e.count + 1; (step 3)
value.update(e); (step 4)

Will this means that after 1 hours when the state is already deprecated, things will happen in this order:

  1. Return the previous state of the record in state besides is deprecated.
  2. The previous state of the record will be cleaned up after that read.
  3. Update the object after previous state was delivered and cleaned up (in read).
  4. Update the state in this case will mean create the state again because the previous was already deleted and this value will take one more hour, or the state will be cleaned up at this point and not at point 1 and the object will not include the update and it will be stored in state as it arrived?

Hopping I could explain myself, because the documentation is not clear to me.

Starting from the point that I need to clean up the states when a day change happens and there is no way to do that using TTL, I want to clean the state after every hour but get the state before removed, update the current value and then create the state again for one more hour but having always the previous state before losing it.

Hope this makes senses and is possible to do in somehow. Kind regards!


Solution

  • If you need to manipulate the state every hour, then create a custom ProcessFunction and use a timer to trigger that action.