Search code examples
apache-flinkflink-streaming

How does connectedStreams work in Apache Flink


I'm following the example in the official doc of Flink to try to understand how connectedStreams work. Here is the example: https://ci.apache.org/projects/flink/flink-docs-master/learn-flink/etl.html#connected-streams

public class StreamingJob {
private static final Logger LOG = LoggerFactory.getLogger(MyFlink.class);

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
  
    control
        .connect(datastreamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}
}

As my understanding, the first parameter control_value of the method flatMap1 should be the elements of the control stream, and the first parameter data_value of the method flatMap2 should be the elements of the streamOfWords.

However, when I try to print them, I always get the empty values.

Here is my changes:

@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
    LOG.info("flatMap1111111: ", control_value);
}

@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
    LOG.info("flatMap2222222: ", data_value);
}

After executing this job, in the log file ./log/flink-root-taskexecutor-0-localhost.localdomain.log, I can see that

2020-07-25 02:40:30,152 INFO  myflink.StreamingJob                                          - flatMap1111111:
2020-07-25 02:40:30,153 INFO  myflink.StreamingJob                                          - flatMap1111111:
2020-07-25 02:40:30,174 INFO  myflink.StreamingJob                                          - flatMap2222222:
2020-07-25 02:40:30,174 INFO  myflink.StreamingJob                                          - flatMap2222222:
2020-07-25 02:40:30,174 INFO  myflink.StreamingJob                                          - flatMap2222222:
2020-07-25 02:40:30,174 INFO  myflink.StreamingJob                                          - flatMap2222222:

As you see, they are all empty.

Did I do something wrong or misunderstand how connectedStreams work?


Solution

  • Seek for the file *.out, not the *.log. The expected output is:

    2020-07-24 16:18:21,083 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Initializing heap keyed state backend with stream factory.
    3> Apache
    4> Flink
    2020-07-24 16:18:21,126 INFO  org.apache.flink.runtime.taskmanager.Task  
    

    Use out.collect() on flatMap2, or print() won't work in this case. Here is the working version, which produces the output above. The first stream "Apache", "DROP", "Flink", "IGNORE" is consumed and filtered by the shared variable blocked. The second stream "DROP", "IGNORE" modified the shared variable blocked.

    package org.sense.flink.examples.stream.tests;
    
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
    import org.apache.flink.util.Collector;
    
    public class ConnectedStreamTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
            DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
    
            control
                    .connect(streamOfWords)
                    .flatMap(new ControlFunction())
                    .print();
    
            env.execute();
        }
    
        private static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
            private ValueState<Boolean> blocked;
    
            @Override
            public void open(Configuration config) {
                blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
            }
    
            @Override
            public void flatMap1(String control_value, Collector<String> out) throws Exception {
                blocked.update(Boolean.TRUE);
            }
    
            @Override
            public void flatMap2(String data_value, Collector<String> out) throws Exception {
                if (blocked.value() == null) {
                    out.collect(data_value);
                }
            }
        }
    }