Search code examples
apache-flinkflink-streaming

Can I use a custom partitioner with group by?


Let's say that I know that my dataset is unbalanced and I know the distribution of the keys. I'd like leverage this to write a custom partitioner to get the most out of the operator instances.

I know about DataStream#partitionCustom. However, if my stream is keyed, will it still work properly? My job would look something like:

KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())

DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()

What I'm trying to achieve is:

  • Having a stream keyBy according to some key so that the reduce function will only be called with elements from that key.
  • The group by split the work across nodes based on some custom partitioning.
  • The custom partitioning returning a number based on the number of parallel operator instances (which will be fixed and not subject to rescaling).
  • The custom partioning returning different values from the keyBy. However, keyBy(x) = keyBy(y) => partition(x) = partition(y).
  • Having pre-aggregation to minimize network traffic before partitioning.

Example of the use-case:

  • Dataset: [(0, A), (0, B), (0, C), (1, D), (2, E)]
  • Number of parallel operator instances: 2
  • Group by function: returns the 1st element of the pair
  • Partition function: returns 0 for key 0 and 1 for keys 1 and 2. Advantage: deal with the data skew that could be sending keys 0 and 1 to the same operator instance which would mean that one operator instance would receive 80% of the dataset.

Solution

  • That is unfortunately not possible. DataStreamUtils.reinterpretAsKeyedStream() requires that the data is identically partitioned as if you would have called keyBy().

    The reason for this limitation are key groups and how keys are mapped to key groups. A key group is Flink's unit of how keyed state is distributed. The number of key groups determines the maximum parallelism of an operator and is configured with setMaxParallelism(). Keys are assigned to key groups with an internal hash function. By changing the partitioning of keys, keys for the same key group will be distributed across multiple machines which will not work.

    In order to tweak the assignment of key to machines, you would need to change the assignment of keys to key groups. However, there is no a public or accessible interface to do that. Therefore, custom key distributions are not supported in Flink 1.6.