I'm new to Flink and trying to understand how Flink orders calls to processElement()
in its KeyedProcessFunction
abstraction under parallelism. Consider this example of producing a stream of partial sums:
package sample
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
object Playground {
case class Record(groupId: String, score: Int) {}
def main(args: Array[String]): Unit = {
// 1. Create the environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(10)
// 2. Source
val record1 = Record("groupX", 1)
val record2 = Record("groupX", 2)
val record3 = Record("groupX", 3)
val records: DataStream[Record] = env.fromElements(record1, record2, record3, record1, record2, record3)
// 3. Application Logic
val partialSums: DataStream[Int] = records
.keyBy(record => record.groupId)
.process(new KeyedProcessFunction[String, Record, Int] {
// Store partial sum of score for Records seen
lazy val partialSum: ValueState[Int] = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("partialSum", classOf[Int]))
// Ingest new record
override
def processElement(value: Record,
ctx: KeyedProcessFunction[String, Record, Int]#Context,
out: Collector[Int]): Unit =
{
val currentSum: Int = partialSum.value()
partialSum.update(currentSum + value.score)
out.collect(partialSum.value())
}
})
// 4. Sink
partialSums.print()
// 5. Build JobGraph and execute
env.execute("sample-job")
}
}
I would expect the output of this to be the stream: 1, 3, 6, 7, 9, 12
. And indeed it is, here.
Is it safe to assume this will always be the case, particularly when reading from sources with large parallelism?
In your example, the ordering is guaranteed within each key. Since there is only one key, you will always get 1, 3, 6, 7, 9, 12
.
When you are reading from a source with parallelism greater than one, then the various source instances will race against each other. When the streams from two or more sources are joined (e.g., via keyBy, union, rebalance, etc.), then the result is non-deterministic (but the events from each of the sources will keep their relative ordering).
For example, if you have
stream X: 1, 2, 3, 4
stream Y: a, b, c, d
and then bring those two streams together, you might get
1, 2, 3, 4, a, b, c, d
, or a, b, 1, 2, 3, c, 4, d
, etc.