If we fill a state object inside a KeyedProcessFunction of a KeyedStream#process such as
new KeyedProcessFunction<String, Rule, Rule>() {
private MapState<String, ArrayList<Rule>> rulesState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
rulesState = getRuntimeContext().getMapState(Descriptors.rulesPerCustomerDescriptor);
}
@Override
public void processElement(Rule value, KeyedProcessFunction<String, Rule, Rule>.Context ctx, Collector<Rule> out) throws Exception {
out.collect(value);
rulesState.put();
}
});
public class MyRichFilterFunction extends RichFilterFunction<Transaction> {
MapState<String, ArrayList<Rule>> rulesState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// use rules state variable to get the rules
rulesState = getRuntimeContext().getMapState(RulesEvaluator.Descriptors.rulesPerCustomerDescriptor);
}
@Override
public boolean filter(Transaction value) throws Exception {
for (Map.Entry<String, ArrayList<Rule>> entry : rulesState.entries()) {
for (Rule rule : entry.getValue()) {
.........
}
}
return true;
}
}
// usage
// fill the state
rulesUpdateStream.process(new ProcessFunction<Rule, Rule>()... // given above
// use the filled state
DataStream<Transaction> alerts = transactions.filter(new MyRichFilterFunction());
Will we get the same state object from another KeyedStream#process method if they both partitioned with the same key?
No. State is local to the specific operator where the state is established. It cannot be accessed from anywhere else.