I am trying to initialize a "ListState" inside CoProcessFunction, however it keeps throwing this error ""java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream'"".
this is the code sinnpet that i used
dataStream1.keyBy(ele->ele.f1).connect(dataStream2).process(
new CoProcessFunction<Tuple2<Long, String>, String, Object>() {
ListState stream1ListState;
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("inside CoProcess Function Constructor");
ListStateDescriptor<Tuple2<Long,String>> listStateDescriptor = new
ListStateDescriptor<Tuple2<Long,String>>("type1",TypeInformation.of(new TypeHint<Tuple2<Long,String>>(){}));
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement1(Tuple2<Long, String> longStringTuple2, Context context,
Collector<Object> collector) throws Exception {
LOG.info("inside stream1 processor");
LOG.info(longStringTuple2.toString());
collector.collect(longStringTuple2);
}
@Override
public void processElement2(String s, Context context, Collector<Object> collector)
throws Exception {
LOG.info("inside stream2 processor");
LOG.info(s);
}
}).print();
the line that throws this error is that line
stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
and the rest of the error log trace is as following
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:232)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:202)
The error means that You may only use ListState
, ValueState
etc. on only inside KeyedCoProcessFunction
, which in turn can be only used when both streams are keyed. So, if the second stream is normal stream and can be keyed by some key then You can do that, otherwise You may want to refer to Broadcast state pattern as described here.