I run a small test case using JUnit5
public class APipelineTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build());
@Test
public void shouldRun() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // <- only works when parallelism is set to 1
// if set >1, the app hangs, and prints nothing
var src = env.fromElements(1, 2, 3, 4, 5, 6);
src.map(i -> {
System.out.println("map: " + i);
return i;
});
env.execute();
}
}
I do not understand why the app works when env.setParallelism(1)
and nothing gets print out when parallelism is set larger than 1.
I have seen similar phenomenon when watermark is not aligned among parallel stateful instances, causing computation to not be triggered, expressed as a stuck application , but for such a simple case, I cannot connect how unaligned watermarks can be used to explain this case.
You created a Flink MiniCluster with one Task Manager and one slot per Task Manager, so your job's maximum parallelism is 1.
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)