Search code examples
apache-flinkflink-streaming

Does flink always returns the same partitioned state objects for the same key value?


If we fill a state object inside a KeyedProcessFunction of a KeyedStream#process such as

    new KeyedProcessFunction<String, Rule, Rule>() {
                  private MapState<String, ArrayList<Rule>> rulesState;
                  @Override
                  public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    rulesState = getRuntimeContext().getMapState(Descriptors.rulesPerCustomerDescriptor);
                  }
                  @Override
                  public void processElement(Rule value, KeyedProcessFunction<String, Rule, Rule>.Context ctx, Collector<Rule> out) throws Exception {
                    out.collect(value);
                    rulesState.put();
                  }
                });
public class MyRichFilterFunction extends RichFilterFunction<Transaction> {
        MapState<String, ArrayList<Rule>> rulesState;

@Override
public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // use rules state variable to get the rules
        rulesState = getRuntimeContext().getMapState(RulesEvaluator.Descriptors.rulesPerCustomerDescriptor);
        }

        @Override
        public boolean filter(Transaction value) throws Exception {
            for (Map.Entry<String, ArrayList<Rule>> entry : rulesState.entries()) {
            for (Rule rule : entry.getValue()) {
                .........
              }
            }
            return true;
          }
        }

// usage
// fill the state
rulesUpdateStream.process(new ProcessFunction<Rule, Rule>()... // given above
// use the filled state
DataStream<Transaction> alerts = transactions.filter(new MyRichFilterFunction());

Will we get the same state object from another KeyedStream#process method if they both partitioned with the same key?


Solution

  • No. State is local to the specific operator where the state is established. It cannot be accessed from anywhere else.