Search code examples
javaapache-flinkflink-streamingflink-cep

initializing Sate cause this error "java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream'"


I am trying to initialize a "ListState" inside CoProcessFunction, however it keeps throwing this error ""java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream'"".

this is the code sinnpet that i used

    dataStream1.keyBy(ele->ele.f1).connect(dataStream2).process(
        new CoProcessFunction<Tuple2<Long, String>, String, Object>() {
            ListState stream1ListState;
            
            @Override
            public void open(Configuration parameters) throws Exception {
                LOG.info("inside CoProcess Function Constructor");
                ListStateDescriptor<Tuple2<Long,String>> listStateDescriptor = new
                    ListStateDescriptor<Tuple2<Long,String>>("type1",TypeInformation.of(new TypeHint<Tuple2<Long,String>>(){}));
                stream1ListState = getRuntimeContext().getListState(listStateDescriptor);
            }
            
            @Override
            public void processElement1(Tuple2<Long, String> longStringTuple2, Context context,
                Collector<Object> collector) throws Exception {
                LOG.info("inside stream1 processor");
                LOG.info(longStringTuple2.toString());
                collector.collect(longStringTuple2);
            }
            
            @Override
            public void processElement2(String s, Context context, Collector<Object> collector)
                throws Exception {
                LOG.info("inside stream2 processor");
                LOG.info(s);
            }
        }).print();

the line that throws this error is that line

stream1ListState = getRuntimeContext().getListState(listStateDescriptor);

and the rest of the error log trace is as following

Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:232)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:202)

Solution

  • The error means that You may only use ListState, ValueState etc. on only inside KeyedCoProcessFunction, which in turn can be only used when both streams are keyed. So, if the second stream is normal stream and can be keyed by some key then You can do that, otherwise You may want to refer to Broadcast state pattern as described here.