Search code examples
intellij-ideaapache-flinkflink-streaming

How to set flink parallelism in pure java (IDEA)


I am using following scala code to run my flink streaming job

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params)).setParallelism(1)
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

when I use flink run -p 1, the parallelism is 1(do not know whether -p works or the code works). when I use pure java to run, (in IDEA I suppose it runs in pure java), the parallelism is usually 5, which shows my code does not work. How to control it?


as top answer suggested, following code also does not work, still has parellelism of 5.

val mystream = StreamExecutionEnvironment.getExecutionEnvironment
    mystream.addSource(new mySource(params))
      .map(new myMap(params))
      .addSink(new mySink(params))
    mystream.setParallelism(1)
    mystream.execute("My Streaming")

Solution

  • You set the default parallelism on the environment.

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.addSource(...)
    

    Using .addSink(new mySink(params)).setParallelism(1) overrides that default parallelism for the specific operator.