Search code examples
apache-flinkflink-streaming

Ordering of records in a keyed stream in Flink


I have a stream in which the records arrive in order. I apply a map function and then keyBy function on it. Will the order of the records be maintained within each stream of records with the same key?

There was a similar question in Ordering of Records in Stream. But I am confused between the answer given there and the below description copied from the link "https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html".

" In a redistributing exchange the ordering among the elements is only preserved within each pair of sending and receiving subtasks (for example, subtask[1] of map() and subtask[2] of keyBy/window). So in this example, the ordering within each key is preserved, but the parallelism does introduce non-determinism regarding the order in which the aggregated results for different keys arrive at the sink."

In the example given, subtask[2] of keyBy receives elements from both subtask[1] and subtask[2] of map. How does the ordering within each key get preserved if the ordering is maintained only between subtasks?


Solution

  • A keyBy operation only maintains the order for events coming from the same subtask. For events coming from different subtasks, Flink does not give you any order guarantees.

    In order to illustrate this assume the following scenario: You have two map subtasks map1 and map2 and two sink subtask sink1 and sink2. Between the mapper and the sink you have a keyBy operation.

    map1 produces the following sequence of events (1, A), (2, B), (1, C), (2, D) and map2 produces (1, U), (1, V), (2, W), (2, X) where the first tuple entry is our key. This means that sink1 will receive the set {(1, A), (1, C), (1, U), (1, V)} and sink2 receives the set {(2, B), (2, D), (2, W), (2, X)}.

    Without loss of generality, let's take a look at the sequence order of sink1. What you can say is that all events coming from the same producing subtask arrive in the same order as they were produced. Thus, (1, A) will arrive before (1, C). However, you cannot say what the order is between events coming from different producing subtasks. So you don't know whether (1, A) arrives before (1, U) or not.