Search code examples
apache-kafkaparallel-processingapache-flinkpartitioningkafka-topic

Apache Flink - Partitioning the stream equally as the input Kafka topic


I would like to implement in Apache Flink the following scenario:

Scenario

Given a Kafka topic having 4 partitions, I would like to process the intra-partition data independently in Flink using different logics, depending on the event's type.

In particular, suppose the input Kafka topic contains the events depicted in the previous images. Each event have a different structure: partition 1 has the field "a" as key, partition 2 has the field "b" as key, etc. In Flink I would like to apply different business logics depending on the events, so I thought I should split the stream in some way. To achieve what's described in the picture, I thought to do something like that using just one consumer (I don't see why I should use more):

FlinkKafkaConsumer<..> consumer = ...
DataStream<..> stream = flinkEnv.addSource(consumer);

stream.keyBy("a").map(new AEventMapper()).addSink(...);
stream.keyBy("b").map(new BEventMapper()).addSink(...);
stream.keyBy("c").map(new CEventMapper()).addSink(...);
stream.keyBy("d").map(new DEventMapper()).addSink(...);

(a) Is it correct? Also, if I would like to process each Flink partition in parallel, since I'm just interested to process in-order the events sorted by the same Kafka partition, and not considering them globally, (b) how can I do? I know the existence of the method setParallelism(), but I don't know where to apply it in this scenario.

I'm looking for an answer about questions marked (a) and (b). Thank you in advance.


Solution

  • If you can build it like this, it will perform better:

    enter image description here

    Specifically, what I'm proposing is

    1. Set the parallelism of the entire job to exactly match the number of Kafka partitions. Then each FlinkKafkaConsumer instance will read from exactly one partition.

    2. If possible, avoid using keyBy, and avoid changing the parallelism. Then the source, map, and sink will all be chained together (this is called operator chaining), and no serialization/deserialization and no networking will be needed (within Flink). Not only will this perform well, but you can also take advantage of fine-grained recovery (streaming jobs that are embarrassingly parallel can recover one failed task without interrupting the others).

    3. You can write a general purpose EventMapper that checks to see what type of event is being processed, and then does whatever is appropriate. Or you can try to be clever and implement a RichMapFunction that in its open() figures out which partition is being handled, and loads the appropriate mapper.