Search code examples
javaapache-flinkflink-streaming

Flink keyBy vs RichParallelSourceFunction


I am learning flink and trying to understand few concepts. Here are a few questions :

  1. What's a difference between keyBy operation on a stream and getting a source from RichParallelSourceFunction childs like FlinkKinesisConsumer ? Both operations divide the stream.
  2. Also tried to implement a very simple keyBy operator to understand it like following : DataStream input = env.fromElements("1", "2", "3", "4", "5", "6") .keyBy((KeySelector<String, Integer>) value -> Integer.parseInt(value) % 2); DataStream parsed = input.map(new MyMapper()); DataStream parsedStr = input.map(new MyStrMapper()); parsed.print(); parsedStr.print(); env.execute("myParser");

But the output I get is baffling :

3> 1
3> 2
3> 3
3> 4
3> 5
3> 6
3> I am 1
3> I am 2
3> I am 3
3> I am 4
3> I am 5
3> I am 6

That means everything executed on subtask 3. Can someone help explain why ?


Solution

  • (1) Differences between using keyBy and using a RichParallelSourceFunction?

    Everytime you use keyBy the stream records must go through serialization/deserialization, and are likely to be sent across the network. On the other hand, a source instance can be chained to subsequent operations, meaning the stream records are simply passed along as objects on the java heap.

    When you have multiple source instances for something like Kafka or Kinesis, they don't divide the stream. Each instance independently connects to the relevant brokers/servers to process the records for the partitions/shards they have been assigned to handle. Thus, with a RichParallelSourceFunction you have the possibility of implementing a much more performant pipeline with less overhead for serialization/deserialization and networking.

    (2) Why is everything going to subtask 3?

    The result of your KeySelector function is hashed, and those hashed values are taken mod 128 (assuming you haven't reconfigured the number of key groups) to map each key to a key group. Flink then determines which subtask is responsible for those key groups.

    Given that your key function can only return two distinct values (0 and 1), you were only going to see either one or two distinct subtasks in use. Apparently both 0 and 1 both hash to key groups that have been assigned to subtask 3.

    Whenever possible, it's best to have a key space that is significantly larger than the parallelism of the cluster.

    References

    For more insight, see my answers to these questions: