Search code examples
javaapache-flink

Apache Flink consecutive splits strange behaviour


I believe, that Flink behaves oddly when two splits are done consecutively. I might have some mistakes in my implementation logic that's why I post here to ask your opinion.

Minimum example: I have a text file containing the words Apple, Banana and Orange. I pass this in stream execution environment as source. I do a first split, in which the select condition is if the argument is the word "Apple". If yes, I put it the "topic" Apples, otherwise in the "topic" NotApples. I then select on this split stream the "topic" NotApples and I split it again but this time the condition checks if the argument is the word "Orange". If yes, it is placed in the "topic" Oranges, otherwise in the "topic" NotOranges.

What I expect at the end, when I print the last split stream's topic NotOranges is to have only the word "Banana" printed. However, what I actually have printed is both the words "Apple" and "Banana". I noticed that when a second split is done, the stream that it is processed is not the one that contains only the elements of the topic which I selected from (i.e., NotApples) but all the elements. Am I missing something?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.readTextFile("input.txt");
SplitStream<String> splitStream1 = datastream.split(new OutputSelector<String>() {

    @Override
    public Iterable<String> select(String arg0) {
        List<String> output = new ArrayList<String>();
        if (arg0.equals("Apple")) {
            output.add("Apples");
        } else {
            output.add("NotApples");
        }
        return output;
    }
});


DataStream<String> notApplesStream = splitStream1.select("NotApples");
SplitStream<String> splitStream2 = notApplesStream.split(new OutputSelector<String>() {

    @Override
    public Iterable<String> select(String arg0) {
        List<String> output = new ArrayList<String>();
        if (arg0.equals("Orange")) {
            output.add("Oranges");
        } else {
            output.add("NotOranges");
        }
        return output;
    }
});

DataStream<String> notApplesAndNotOrangesStream = splitStream2.select("NotOranges");
notApplesAndNotOrangesStream.print();
env.execute("SplitTest");

Output:

1> Apple
1> Apple
1> Banana
2> Apple
2> Apple
2> Apple
4> Apple
4> Apple
4> Banana
3> Apple
3> Banana
3> Apple

NB.: I know that I could have a single split to implement the same logic (in which I check if the argument is "Apple" OR "Organge"). However, this is not the point of my question. I initially noticed this behaviour in a more complicated program I've written where two consequetive splits are necessary so I decided to try to recreate it in a minimal example to check if I can reproduce it.


Solution

  • There was a recent discussion about this incorrect behavior on the mailing list, with the subject "About Deprecating split/select for DataStream API". I think the key comment was:

    First, we must admit that the current implementation for split/select is flawed. I roughly went through the source codes, the problem may be that for consecutive select/split(s), the former one will be overridden by the later one during StreamGraph generation phase. That's why we forbid this consecutive logic in FLINK-11084.

    After looking at FLINK-11084 and the resulting patch, I believe recent releases of Flink will throw an exception if you do two consecutive split/selects.