Below is the code:
public class VerifyDuplicate {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Tuple3<String, String, Integer>> dataStream = env.fromElements(
Tuple3.of("ID_1", "subid_1", 1),
Tuple3.of("ID_2", "subid_2", 2),
Tuple3.of("ID_3", "subid_3", 3),
Tuple3.of("ID_4", "subid_4", 4),
Tuple3.of("ID_4", "subid_4", 4),
Tuple3.of("ID_6", "subid_6", 6),
Tuple3.of("ID_4", "subid_7", 7),
Tuple3.of("ID_8", "subid_8", 8),
Tuple3.of("ID_9", "subid_9", 9),
Tuple3.of("ID_10", "subid_10", 10)
);
KeyedStream<Tuple3<String, String, Integer>, String> partitionedStream = dataStream.keyBy(new KeySelector<Tuple3<String, String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0; // partition on f0
}
});
partitionedStream.keyBy(new KeySelector<Tuple3<String, String,Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f1; // subid
}
}).flatMap(new FilterDuplicate()).print();
env.execute("Test");
}
}
public class FilterDuplicate extends RichFlatMapFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> {
private ValueState<Boolean> seen;
@Override
public void open(Configuration configuration) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupFullSnapshot()
.build();
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
desc.enableTimeToLive(ttlConfig);
seen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Tuple3<String, String, Integer> value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
if (!seen.value()) { // nullpointerexception
// we haven't seen the element yet
out.collect(value);
// set operator state to true so that we don't emit elements with this key again
seen.update(true);
}
}
}
<flink.version>1.17.1</flink.version>
<target.java.version>11</target.java.version>
NullPointerException
at !seen.value()
in flatMap()
method
Why seen.value()
is giving NullPointerException? Checked with seen==null
condition,before problem line, but still fails...
NullPointerException
, just do: if (seen.value() == null) {
out.collect(value);
seen.update(true);
}
keyBy()
operations in a row. The partitioning of the first one will be lost when you do the second one. If you want to partition by id and then sub-id, you should have a single keyBy()
which returns a key formed by those two fields from the tuple.