I am developing a DataStream
-based Flink application for a high volume streaming use case (tens of millions of events per second). The data is consumed from a Kafka topic and is already sharded according to a certain key. My intention is to create key-specific states on the Flink side to run custom analytics. The main problem that I can't wrap my head around is how to create the keyed states without reshuffling of the incoming data that is imposed by keyBy()
.
I can guarantee that the maximum parallelism of the Flink job will be less than or equal to the number of partitions in the source Kafka topic, so logically the shuffling is not necessary. The answer to this StackOverflow question suggests that it may be possible to write the data into Kafka in a way that is compatible with the expectations of Flink and then use reinterpretAsKeyedStream()
. I would be happy to do it for this application. Would someone be able to share the necessary steps?
Thank you in advance.
What you need to do is to ensure that each event is written to the Kafka partition that will be read by the same task slot to which the key for that event will be assigned.
Here's what you need to know to make that work:
(1) Kafka partitions are assigned in round-robin fashion to task slots: partition 0 goes to slot 0, partition 1 to slot 1, etc, wrapping back around to slot 0 if there are more partitions than slots.
(2) Keys are mapped to key groups, and key groups are assigned to slots. The number of key groups is determined by the maximum parallelism (which is a configuration parameter; the default is 128).
The key group for a key is computed via
keygroupId = MathUtils.murmurHash(key.hashCode()) % maxParallelism
and then the slot is assigned according to
slotIndex = keygroupId * actualParallelism / maxParallelism
(3) Then you'll need to use DataStreamUtils.reinterpretAsKeyedStream
to get Flink to treat the pre-partitioned streams as keyed streams.
One effect of adopting this approach is that it will be painful if you ever need to change the parallelism.