I'm working with Apache Flink and using the machanism ConnectedStreams
. Here is my code:
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE");
DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE");
.flatMap(new ControlFunction())
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private boolean found;
public void open(Configuration config) {
this.found = false;
public void flatMap1(String control_value, Collector<String> out) throws Exception {
if (control_value.equals("DROP")) {
this.found = true;
} else {
this.found = false;
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (this.found) {
this.found = false;
} else {
// nothing to do
As you see, I used a boolean variable to control the process of stream. The boolean variable found
is read and written in flatMap1
and in flatMap2
. So I'm thinking if I need to worry about the thread-safe issue.
Can the ConnectedStreams ensure thread safe? If not, does it mean that I need to lock the variable found
in flatMap1
and in flatMap2
The calls to flatMap1()
and flatMap2()
are guaranteed to not overlap, so you don't need to worry about concurrent access to your class's variables.