I'm currently developing an operator (sink) that uses flink's keyed state. State backend is heap based. State ttl is set to 24 hours. Operator usecase is like that: first we catch request and store something in valueState, then we catch response and do some logic with the request and response.
What I'm afraid of is that state might grow up much faster in prod environment, than I expect. So I thought about adding some featureFlag property, to be able to disable operator quickly if anything goes wrong.
As i understand flink seems to dislike cases where you add and remove stateful operators from job graph from checkpoint to checkpoint, so i modified my operator, if propery is true - operator works as expected, if it's false - it will immediately return and skip its functionality.
That's should work, but I have no idea how to clean state that would already be created, if I decided to disable featureFlag.
Because of the keyed nature of state, I can't just call state.clear() because there is no key context, and i don't want to wait 24hours with oversized state. I thought about changing ttlConfig to 1 minute instead 24 hours (for example) if featureFlag is false, but i checked the source code and realize that if state by descriptor already existed in stateBackend it would be returned, and new ttl config would never be used.
So
Are there any ways to force clean state by stateDescriptor? Or i can only wait till ttl would clean state?
P.S. question seems to be related to How does one cleanup Flink stream state for inactive keys?
but i can't use same approach because of
One option you haven't mentioned is as follows: Instead of holding the keyed state in something like a KeyedProcessFunction
, use a KeyedBroadcastProcessFunction
. Then when you want to clear some state, broadcast an event into that operator. In response to that incoming broadcast event, the processBroadcastElement
method can call applyToKeyedState
on the KeyedBroadcastProcessFunction.Context
object that was passed in. This applyToKeyedState
method can use a KeyedStateFunction
to iterate over the keyed state, for all keys, and clear the state for any keys it wants to clean up.
For details, see
If you can tolerate some downtime, another solution would be to stop the job with a savepoint, use the state processor API to clear out the state for some of the keys, and then restart from the modified savepoint.