Let's say that I know that my dataset is unbalanced and I know the distribution of the keys. I'd like leverage this to write a custom partitioner to get the most out of the operator instances.
I know about DataStream#partitionCustom. However, if my stream is keyed, will it still work properly? My job would look something like:
KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())
DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()
What I'm trying to achieve is:
keyBy(x) = keyBy(y) => partition(x) = partition(y)
.Example of the use-case:
That is unfortunately not possible. DataStreamUtils.reinterpretAsKeyedStream()
requires that the data is identically partitioned as if you would have called keyBy()
.
The reason for this limitation are key groups and how keys are mapped to key groups. A key group is Flink's unit of how keyed state is distributed. The number of key groups determines the maximum parallelism of an operator and is configured with setMaxParallelism()
. Keys are assigned to key groups with an internal hash function. By changing the partitioning of keys, keys for the same key group will be distributed across multiple machines which will not work.
In order to tweak the assignment of key to machines, you would need to change the assignment of keys to key groups. However, there is no a public or accessible interface to do that. Therefore, custom key distributions are not supported in Flink 1.6.