Search code examples
apache-flinkflink-streaming

KeyBy is not creating different keyed streams for different keys


I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B.

The input:

{
    "A": "352580084349898",
    "B": "1546559127",
    "C": "A"
}

This is the core logic of my Flink code:

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
            .map(new MapFunction<String, GenericDataObject>() {
                @Override
                public GenericDataObject map(String s) throws Exception {
                    JSONObject jsonObject = new JSONObject(s);
                    GenericDataObject genericDataObject = new GenericDataObject();
                    genericDataObject.setA(jsonObject.getString("A"));
                    genericDataObject.setB(jsonObject.getString("B"));
                    genericDataObject.setC(jsonObject.getString("C"));
                    return genericDataObject;
                }
            });
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
            .keyBy("A", "B")
            .map(new MapFunction<GenericDataObject, GenericDataObject>() {
                @Override
                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
                    return genericDataObject;
                }
            });
testStream.print();

GenericDataObject is a POJO with three fields A, B and C .

And this is the console output for different values of field B.

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}

Notice lines 1 and 2. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here, can someone please point me in the right direction?


Solution

  • Firstly, you're doing nothing wrong.

    Why they are in the same subtask?

    Assume that you have thousands of keys, and it's impossible for Apache Flink to create thousands of threads for each of them. Therefore, there have to be another mechanism to make sure that a group of keys are handled in one thread but seperately.

    Therefore, in Apache Flink, every subtask has its own key groups, different keys with the same key group index will be handled in the same subtask. And a subtask usually handles a few keys with individual keyed state to keep the state of different keys seperate.

    The keyBy doesn't mean that different keys are assigned to different subtasks(or partitions), but all the records with the same key will be assigned to the same subtask. So you can only decide if different keys are in the same group by programming a KeySelector instance.

    For more details, you can look into this article in Apache Flink's official website.

    A Deep Dive into Rescalable State in Apache Flink