I'm working with Apache Flink and using the machanism ConnectedStreams
. Here is my code:
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE");
DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE");
control
.connect(datastreamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private boolean found;
@Override
public void open(Configuration config) {
this.found = false;
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
if (control_value.equals("DROP")) {
this.found = true;
} else {
this.found = false;
}
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (this.found) {
out.collect(data_value);
this.found = false;
} else {
// nothing to do
}
}
}
}
As you see, I used a boolean variable to control the process of stream. The boolean variable found
is read and written in flatMap1
and in flatMap2
. So I'm thinking if I need to worry about the thread-safe issue.
Can the ConnectedStreams ensure thread safe? If not, does it mean that I need to lock the variable found
in flatMap1
and in flatMap2
?
The calls to flatMap1()
and flatMap2()
are guaranteed to not overlap, so you don't need to worry about concurrent access to your class's variables.