Search code examples
apache-flinkflink-streaming

Flink KeyedProcessFunction Ordering


I'm new to Flink and trying to understand how Flink orders calls to processElement() in its KeyedProcessFunction abstraction under parallelism. Consider this example of producing a stream of partial sums:

package sample

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector

object Playground {
  case class Record(groupId: String, score: Int) {}

  def main(args: Array[String]): Unit = {
    // 1. Create the environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
    env.setParallelism(10)

    // 2. Source
    val record1 = Record("groupX", 1)
    val record2 = Record("groupX", 2)
    val record3 = Record("groupX", 3)
    val records: DataStream[Record] = env.fromElements(record1, record2, record3, record1, record2, record3)

    // 3. Application Logic
    val partialSums: DataStream[Int] = records
      .keyBy(record => record.groupId)
      .process(new KeyedProcessFunction[String, Record, Int] {
        // Store partial sum of score for Records seen
        lazy val partialSum: ValueState[Int] = getRuntimeContext.getState(
          new ValueStateDescriptor[Int]("partialSum", classOf[Int]))

        // Ingest new record
        override
        def processElement(value: Record,
                           ctx: KeyedProcessFunction[String, Record, Int]#Context,
                           out: Collector[Int]): Unit =
        {
          val currentSum: Int = partialSum.value()
          partialSum.update(currentSum + value.score)
          out.collect(partialSum.value())
        }
      })

    // 4. Sink
    partialSums.print()

    // 5. Build JobGraph and execute
    env.execute("sample-job")
  }
}

I would expect the output of this to be the stream: 1, 3, 6, 7, 9, 12. And indeed it is, here.

Is it safe to assume this will always be the case, particularly when reading from sources with large parallelism?


Solution

  • In your example, the ordering is guaranteed within each key. Since there is only one key, you will always get 1, 3, 6, 7, 9, 12.

    When you are reading from a source with parallelism greater than one, then the various source instances will race against each other. When the streams from two or more sources are joined (e.g., via keyBy, union, rebalance, etc.), then the result is non-deterministic (but the events from each of the sources will keep their relative ordering).

    For example, if you have

    stream X: 1, 2, 3, 4
    stream Y: a, b, c, d
    

    and then bring those two streams together, you might get

    1, 2, 3, 4, a, b, c, d, or a, b, 1, 2, 3, c, 4, d, etc.