Search code examples
apache-flink

Using parallelism > 1 and `env.fromElements` causing app to hang


I run a small test case using JUnit5

public class APipelineTest {
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(1)
                            .setNumberTaskManagers(1)
                            .build());

    @Test
    public void shouldRun() throws Exception {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2); // <- only works when parallelism is set to 1
                               //    if set >1, the app hangs, and prints nothing

        var src = env.fromElements(1, 2, 3, 4, 5, 6);

        src.map(i -> {
            System.out.println("map: " + i);
            return i;
        });

        env.execute();
    }
}

I do not understand why the app works when env.setParallelism(1) and nothing gets print out when parallelism is set larger than 1.

I have seen similar phenomenon when watermark is not aligned among parallel stateful instances, causing computation to not be triggered, expressed as a stuck application , but for such a simple case, I cannot connect how unaligned watermarks can be used to explain this case.


Solution

  • You created a Flink MiniCluster with one Task Manager and one slot per Task Manager, so your job's maximum parallelism is 1.

                                .setNumberSlotsPerTaskManager(1)
                                .setNumberTaskManagers(1)