Search code examples
apache-flink

Q.Apache Flink : how to get current key during initializeState


i want to init value state from my own db(es.eg) if the application can not restore state from the backend , but how can i get current key during initializeState ?

here is the sample code :

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
    KeyedStateStore stateStore = context.getKeyedStateStore();
    ValueStateDescriptor<PickUpState> pickUpStateConfig = new ValueStateDescriptor<>("pickUpState", PickUpState.class);

    ValueState<PickUpState> state = stateStore.getState(pickUpStateConfig);
    pickUpState = state;
    if(!context.isRestored()){
        //get the current key helpful
        String key =  ...

        PickUpState upState = initStateFromEs(key);
        state.update(upState);

    }
}

Any repply will be helpful , thx !


Solution

  • That's not possible because there is no current key when initializeState is called.

    Each instance of a user function is multiplexed across many keys -- namely all of the keys in the key groups assigned to that task slot. initializeState is just called once, and it needs to perform whatever actions are needed for all of those keys. (And there is no way to determine which keys are relevant to a given instance.)

    The assumption is that the state backend is always available. The only time this isn't true is if the remote filesystem storing the snapshot is unavailable, in which case there's nothing you can do about it -- except for rebuilding the state from scratch. E.g., you could use the state processor API to build a new state snapshot from the data in the database.