Search code examples
javanullpointerexceptionapache-flinkflink-streaming

Flink flatMap() - NullPointerException


Below is the code:

public class VerifyDuplicate {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Tuple3<String, String, Integer>> dataStream = env.fromElements(
            Tuple3.of("ID_1", "subid_1", 1),
            Tuple3.of("ID_2", "subid_2", 2),
            Tuple3.of("ID_3", "subid_3", 3),
            Tuple3.of("ID_4", "subid_4", 4),
            Tuple3.of("ID_4", "subid_4", 4),
            Tuple3.of("ID_6", "subid_6", 6),
            Tuple3.of("ID_4", "subid_7", 7),
            Tuple3.of("ID_8", "subid_8", 8),
            Tuple3.of("ID_9", "subid_9", 9),
            Tuple3.of("ID_10", "subid_10", 10)
            );

        KeyedStream<Tuple3<String, String, Integer>, String> partitionedStream = dataStream.keyBy(new KeySelector<Tuple3<String, String,Integer>, String>() {
                @Override
                public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                    return value.f0; // partition on f0
                }
            });


        partitionedStream.keyBy(new KeySelector<Tuple3<String, String,Integer>, String>() {
                @Override
                public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                    return value.f1; // subid
                }
            }).flatMap(new FilterDuplicate()).print();

        env.execute("Test");
}    
}

public  class FilterDuplicate extends RichFlatMapFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> {

    private ValueState<Boolean> seen;

    @Override
    public void open(Configuration configuration) {
        StateTtlConfig ttlConfig = StateTtlConfig
          .newBuilder(Time.seconds(15))
          .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
          .cleanupFullSnapshot()
          .build();
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
        desc.enableTimeToLive(ttlConfig);
        seen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Tuple3<String, String, Integer> value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
        
        if (!seen.value()) { // nullpointerexception
            // we haven't seen the element yet
            out.collect(value);
            // set operator state to true so that we don't emit elements with this key again
            seen.update(true);
        }
    }
}

  <flink.version>1.17.1</flink.version>
  <target.java.version>11</target.java.version>

NullPointerException at !seen.value() in flatMap() method

Why seen.value() is giving NullPointerException? Checked with seen==null condition,before problem line, but still fails...


Solution

    1. For the NullPointerException, just do:
        if (seen.value() == null) {
            out.collect(value);
            seen.update(true);
        }
    
    1. You have two keyBy() operations in a row. The partitioning of the first one will be lost when you do the second one. If you want to partition by id and then sub-id, you should have a single keyBy() which returns a key formed by those two fields from the tuple.