Search code examples
apache-flinkflink-streaming

How to setup apache Flink local execution environment with multi-threading?


I would like to test apache flink partitioning. For that I created a simple program in Scala. The idea is, a source generates 0 and 1, then key by them, and print the thread id and subtask id in the processor. I expect that 0 and 1 are processed in different subtask.

  case class Input(var count: Long = 0) extends Iterator[Long] {
    override def hasNext: Boolean = count < 1000

    override def next(): Long = {
      count += 1
      count % 2
    }
  }

  case class Process() extends RichFlatMapFunction[Long, Long] {
    override def flatMap(value: Long, out: Collector[Long]): Unit = {
      println(s"${Thread.currentThread().getId} ${getRuntimeContext.getIndexOfThisSubtask}/${getRuntimeContext.getNumberOfParallelSubtasks} $value")
      Thread.sleep(1000)
    }
  }

  def exp003(): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism = 3)
    env.fromCollection(Input()).keyBy[Long]{x: Long => x}.flatMap(Process()).print()
    env.execute()
  }

But no matter what parallelsim I set, the data is forwarded to the last sub task, thus there is no parallelism. The out put looks like:

84 2/3 1
84 2/3 0
84 2/3 1
84 2/3 0
84 2/3 1
84 2/3 0
84 2/3 1
84 2/3 0
84 2/3 1
84 2/3 0
84 2/3 1
...

I expect data 0 get processed by subtask 0 and data 1 get processed by subtask 1.

P.S. The flink version I'm using is 1.17.0


Solution

  • Flink calculates the subtask for a key by first calculating the key's hashCode(), then running that through a Murmer3 hash, then calculating the "key group" based on that value modulo your workflow's "max parallelism" (128 by default), and finally calculating modulo workflow parallelism.

    So there's no direct mapping from a key with value 0 to sub-task 0.

    Try changing your source to create values from 0...10_000, and see how they get distributed.