I am learning flink and trying to understand few concepts. Here are a few questions :
keyBy
operation on a stream and getting a source from RichParallelSourceFunction
childs like FlinkKinesisConsumer
? Both operations divide the stream.
DataStream input = env.fromElements("1", "2", "3", "4", "5", "6") .keyBy((KeySelector<String, Integer>) value -> Integer.parseInt(value) % 2);
DataStream parsed = input.map(new MyMapper());
DataStream parsedStr = input.map(new MyStrMapper());
parsed.print();
parsedStr.print();
env.execute("myParser");
But the output I get is baffling :
3> 1
3> 2
3> 3
3> 4
3> 5
3> 6
3> I am 1
3> I am 2
3> I am 3
3> I am 4
3> I am 5
3> I am 6
That means everything executed on subtask 3. Can someone help explain why ?
(1) Differences between using keyBy
and using a RichParallelSourceFunction
?
Everytime you use keyBy
the stream records must go through serialization/deserialization, and are likely to be sent across the network. On the other hand, a source instance can be chained to subsequent operations, meaning the stream records are simply passed along as objects on the java heap.
When you have multiple source instances for something like Kafka or Kinesis, they don't divide the stream. Each instance independently connects to the relevant brokers/servers to process the records for the partitions/shards they have been assigned to handle. Thus, with a RichParallelSourceFunction
you have the possibility of implementing a much more performant pipeline with less overhead for serialization/deserialization and networking.
(2) Why is everything going to subtask 3?
The result of your KeySelector
function is hashed, and those hashed values are taken mod 128 (assuming you haven't reconfigured the number of key groups) to map each key to a key group. Flink then determines which subtask is responsible for those key groups.
Given that your key function can only return two distinct values (0 and 1), you were only going to see either one or two distinct subtasks in use. Apparently both 0 and 1 both hash to key groups that have been assigned to subtask 3.
Whenever possible, it's best to have a key space that is significantly larger than the parallelism of the cluster.
References
For more insight, see my answers to these questions: