Search code examples
javaapache-flinkflink-streaming

Flink - Why the bounded stream print out of order sequence?


In the below code:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromElements(
        Tuple2.of("01", 1),
        Tuple2.of("02", 2),
        Tuple2.of("03", 3),
        Tuple2.of("04", 4),
        Tuple2.of("05", 5)
        );  
    dataStream.print();

Output shows:

    Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1227405543]
08/26/2023 19:00:45     Job execution switched to status RUNNING.
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to SCHEDULED
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to SCHEDULED 
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to DEPLOYING 
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to RUNNING 
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to RUNNING
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to RUNNING
5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to FINISHED 
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to FINISHED
08/26/2023 19:00:45     Job execution switched to status FINISHED.

  1. Why the output shows the stream, out of order? How to enable order in sequencing?

    5> (05,5)
    1> (01,1)
    3> (03,3)
    2> (02,2)
    4> (04,4)
    
  2. Why multiple sinks(almost 8) are enabled, as per trace?


Solution

  • There is a concept of parallelism in Flink, which you can think of as having multiple threads working simultaneously. When there is no clear parallelism in the code, the number of CPUs will be used as the default parallelism. This also explains why there are 8 sinks.

    So in your code, these five tuples are executed in different threads, so multiple threads work simultaneously, and the final output is not orderly.

    When these tuples are executed in the same thread, their output results are ordered, which is the order of the elements you define.You can set the global parallelism to 1 by setting it this way env.setParallelism(1);, but this will reduce the efficiency of processing.

    If you want to achieve order under multiple degrees of parallelism, you can only change the parallelism during sink and process them in the same thread.