Search code examples
apache-flink

Apache Flink - ConnectedStream order and backpressure


Same code below shows two source functions - one that produces even numbers from 0-20 and another that produces odd numbers form 1-20, being connected together to output a union of all both stream and print them out.

Sample code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

Output

1578465207355:SourceB: 1
1578465207379:SourceA: 0
1578465207437:OperatorA: 0
1578465208360:SourceB: 3
1578465208380:SourceA: 2
1578465209364:SourceB: 5
1578465209383:SourceA: 4
1578465210366:SourceB: 7
1578465210386:SourceA: 6
1578465211369:SourceB: 9
1578465211390:SourceA: 8
1578465212370:SourceB: 11
1578465212394:SourceA: 10
1578465212440:Sink: 0
1578465212441:OperatorB: 1
1578465213375:SourceB: 13
1578465213399:SourceA: 12
1578465214379:SourceB: 15
1578465214401:SourceA: 14
1578465215383:SourceB: 17
1578465215406:SourceA: 16
1578465216388:SourceB: 19
1578465216409:SourceA: 18
1578465217441:Sink: 1
1578465217441:OperatorB: 3
1578465222446:Sink: 3
1578465222446:OperatorB: 5
1578465227448:Sink: 5
1578465227449:OperatorB: 7
1578465232452:Sink: 7
1578465232453:OperatorB: 9
1578465237453:Sink: 9
1578465237453:OperatorB: 11
1578465242456:Sink: 11
1578465242456:OperatorA: 2
1578465247462:Sink: 2
1578465247462:OperatorA: 4
1578465252467:Sink: 4
1578465252467:OperatorA: 6

Q1.

Flink is supposed to ship whichever item arrives first in either of the connected streams to the CoProcessFunction. However, what we see here is that number "2" is produced by source function way before number "11", but number "11" is sent to the the CoProcessFunction before "2". Why is that?

Q2.

There is no backpressure happening in Connected Stream. The source functions run until the end even though they are still being processed by the operator (simulated by Thread.sleep in the code above). Is there any way to implement backpressure with connected stream?

Code Edit V2

        Configuration config = new Configuration();
    config.setInteger("taskmanager.network.numberOfBuffers", 4);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);

    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

Output

1578605461497:SourceB: 7279
1578605461497:SourceB: 7281
1578605466406:Sink: 1
1578605466406:OperatorB: 3 <---- only odd numbers (input B) in the output
1578605471411:Sink: 3
1578605471411:OperatorB: 5
1578605476414:Sink: 5
1578605476415:OperatorB: 7
1578605481415:Sink: 7
1578605481415:OperatorB: 9
1578605486417:Sink: 9
1578605486417:OperatorB: 11
1578605491422:Sink: 11
1578605491422:OperatorB: 13
1578605496427:Sink: 13
1578605496427:OperatorB: 15
1578605501432:Sink: 15
1578605501432:OperatorB: 17
1578605506434:Sink: 17
1578605506434:OperatorB: 19
1578605511435:Sink: 19
1578605511435:OperatorB: 21
1578605516435:Sink: 21
1578605516436:OperatorB: 23
1578605521436:Sink: 23
1578605521436:OperatorB: 25
1578605526440:Sink: 25
1578605526440:OperatorB: 27
1578605531443:Sink: 27
1578605531443:OperatorB: 29
1578605536447:Sink: 29
1578605536447:OperatorB: 31
1578605541452:Sink: 31
1578605541452:OperatorB: 33
1578605546457:Sink: 33
1578605546457:OperatorB: 35
1578605551457:Sink: 35
1578605551457:OperatorB: 37
1578605556460:Sink: 37
1578605556460:OperatorB: 39
1578605561518:Sink: 39
1578605561519:OperatorB: 41
1578605566536:Sink: 41
1578605566536:OperatorB: 43
1578605571547:Sink: 43
1578605571547:OperatorB: 45
1578605576554:Sink: 45
1578605576554:OperatorB: 47
1578605581561:Sink: 47
1578605581562:OperatorB: 49
1578605586568:Sink: 49
1578605586568:OperatorB: 51
1578605591576:Sink: 51
1578605591576:OperatorB: 53
1578605596580:Sink: 53
1578605596580:OperatorB: 55
1578605601586:Sink: 55
1578605601587:OperatorB: 57
1578605606592:Sink: 57
1578605606592:OperatorB: 59
1578605611596:Sink: 59
1578605611596:OperatorB: 61
1578605616602:Sink: 61
1578605616602:OperatorB: 63
1578605621606:Sink: 63
1578605621606:OperatorB: 65
1578605626608:Sink: 65
1578605626608:OperatorB: 67
1578605631613:Sink: 67
1578605631613:OperatorB: 69
1578605636618:Sink: 69
1578605636618:OperatorB: 71

Solution

  • Q1

    It's important to understand that orderness guarantees are only applied on channels. This freedom allows operators with two inputs to actively choose which input to consume. Think of a hash join, that first ones to fully consume one side to build a hash table and then stream the second side to probe the table.

    In particular, for you, that means that you do not have any orderness guarantees between two connected channels as they are still logically and physically separated.

    Do you have any use case where you require the orderness across both inputs?

    Q2.

    You cannot observe back pressure because you have too little data. On any networked channel you have buffers on the sender and receiver side. So until you saturate both, you will not see any back pressure being applied.

    edit: regarding your first comment

    Q1 CoGroupProcessor will alternate between inputs on a best effort base exactly to avoid input starvation. However, when one of the inputs are idle, it will read only from the other input. After the input becomes busy again, it may take some time (< 1 ms) for the stream to be picked up again.

    Q2 I adjusted your code and lowered the number of network buffers to 10 and removed the sleeps from your inputs and got the following output that shows the backpressure.

    1578560715990:SourceA: 0
    1578560715990:SourceB: 1
    ...
    1578560716041:OperatorA: 0 <-- blocks coprocessfunction
    ...
    1578560716280:SourceB: 29127 <-- at this point network buffers are full
    1578560721030:Sink: 0 <-- slow processing in coprocess function, no more inputs are generated because of backpressure
    1578560721030:OperatorB: 1
    1578560726034:Sink: 1
    1578560726034:OperatorA: 2 <-- clear alternation between inputs
    1578560731038:Sink: 2
    1578560731039:OperatorB: 3
    1578560736043:Sink: 3
    1578560736043:OperatorA: 4
    1578560741047:Sink: 4
    1578560741047:OperatorB: 5
    1578560746051:Sink: 5
    ...