Search code examples
apache-flinkflink-streamingflink-batchflink-stateflink-stateful-functions

Flink force cleanup keyed state for state descriptor


I'm currently developing an operator (sink) that uses flink's keyed state. State backend is heap based. State ttl is set to 24 hours. Operator usecase is like that: first we catch request and store something in valueState, then we catch response and do some logic with the request and response. 

What I'm afraid of is that state might grow up much faster in prod environment, than I expect. So I thought about adding some featureFlag property, to be able to disable operator quickly if anything goes wrong.

As i understand flink seems to dislike cases where you add and remove stateful operators from job graph from checkpoint to checkpoint, so i modified my operator, if propery is true - operator works as expected, if it's false - it will immediately return and skip its functionality.

That's should work, but I have no idea how to clean state that would already be created, if I decided to disable featureFlag.

Because of the keyed nature of state, I can't just call state.clear() because there is no key context, and i don't want to wait 24hours with oversized state. I thought about changing ttlConfig to 1 minute instead 24 hours (for example) if featureFlag is false, but i checked the source code and realize that if state by descriptor already existed in stateBackend it would be returned, and new ttl config would never be used.

So

  1. It seems like, I can't change ttl for an existing state.
  2. It seems like I can't clear the state because i might never receive second message with the same key, to get needed key context on the state.

Are there any ways to force clean state by stateDescriptor? Or i can only wait till ttl would clean state?

P.S. question seems to be related to How does one cleanup Flink stream state for inactive keys?

but i can't use same approach because of

  1. Keyed state nature. Taking in account, that ValueState is just a state backend with current processing Key, i doubt that callback would work.
  2. Because limited resources i think that storing millions of callbacks would lead me to same memory and checkpointSize problems, so it's easier to just wait till ttl hit.

Solution

  • One option you haven't mentioned is as follows: Instead of holding the keyed state in something like a KeyedProcessFunction, use a KeyedBroadcastProcessFunction. Then when you want to clear some state, broadcast an event into that operator. In response to that incoming broadcast event, the processBroadcastElement method can call applyToKeyedState on the KeyedBroadcastProcessFunction.Context object that was passed in. This applyToKeyedState method can use a KeyedStateFunction to iterate over the keyed state, for all keys, and clear the state for any keys it wants to clean up.

    For details, see

    If you can tolerate some downtime, another solution would be to stop the job with a savepoint, use the state processor API to clear out the state for some of the keys, and then restart from the modified savepoint.