Search code examples
apache-flinkflink-streaming

Is ConnectedStreams thread safe in Apache Flink


I'm working with Apache Flink and using the machanism ConnectedStreams. Here is my code:

public class StreamingJob {

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env.fromElements("DROP", "IGNORE");
    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE");
  
    control
        .connect(datastreamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private boolean found;
      
    @Override
    public void open(Configuration config) {
        this.found = false;
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        if (control_value.equals("DROP")) {
            this.found = true;
        } else {
            this.found = false;
        }
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (this.found) {
            out.collect(data_value);
            this.found = false;
        } else {
            // nothing to do
        }
    }
}
}

As you see, I used a boolean variable to control the process of stream. The boolean variable found is read and written in flatMap1 and in flatMap2. So I'm thinking if I need to worry about the thread-safe issue.

Can the ConnectedStreams ensure thread safe? If not, does it mean that I need to lock the variable found in flatMap1 and in flatMap2?


Solution

  • The calls to flatMap1() and flatMap2() are guaranteed to not overlap, so you don't need to worry about concurrent access to your class's variables.