Search code examples
apache-flink

How does the Flink Optimizer decide on parallelism?


Below is a slide about Flink's optimizer from my a presentation I watched. I'm particularly confused about the comment that Flink's optimizer decides on parallelism depending on the cardinalities of the provided dataset.

I'm currently going through the Flink 1.4 (the version I'm using) documentation and I can't seem to find any documentation regarding Flink's decision on parallelism. Do I need to provide Flink's optimizer with statistics about the datasets in order to take advantage of this feature?

enter image description here

On a related note, I thought that by specifying a maxParallelism value, this potentially would enable Flink to dynamically determine what level of parallelism would be appropriate for the provided dataset automatically (as detailed above). However, I'm unable to specify max parallelism as specified by the Flink 1.4 documentation, which is why I haven't been able to verify my hypothesis. For some context, I am using the DataSet API. How do I specify max parallelism in Flink?

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(20); // can't seem to call this method on env

enter image description here


Solution

  • Not sure where you found this presentation but it is quite old, probably 2014 or early 2015.

    The slide discusses the optimizer of Flink's DataSet API. The optimizer is not used to optimize DataStream API programs. On the other hand, the setting of the maximum parallelism is only applicable for DataStream API programs but not for DataSet programs.

    The quoted sentence is under the bullet point "Goal: efficient execution plans for data processing plans". Not all of its subpoints have been implemented, including automatic configuration of exeuction parallelism.

    The roadmap of the Flink community includes the plan to integrate the DataSet API into the DataStream API and drop the optimizer. Flink's Table API / SQL will continue to have a cost-based optimizer (based on Apache Calcite) and might also configure the execution parallelism in the future.