I'm following the example in the official doc of Flink to try to understand how connectedStreams
work. Here is the example: https://ci.apache.org/projects/flink/flink-docs-master/learn-flink/etl.html#connected-streams
public class StreamingJob {
private static final Logger LOG = LoggerFactory.getLogger(MyFlink.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
control
.connect(datastreamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
}
As my understanding, the first parameter control_value
of the method flatMap1
should be the elements of the control stream, and the first parameter data_value
of the method flatMap2
should be the elements of the streamOfWords
.
However, when I try to print them, I always get the empty values.
Here is my changes:
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
LOG.info("flatMap1111111: ", control_value);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
LOG.info("flatMap2222222: ", data_value);
}
After executing this job, in the log file ./log/flink-root-taskexecutor-0-localhost.localdomain.log
, I can see that
2020-07-25 02:40:30,152 INFO myflink.StreamingJob - flatMap1111111:
2020-07-25 02:40:30,153 INFO myflink.StreamingJob - flatMap1111111:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
As you see, they are all empty.
Did I do something wrong or misunderstand how connectedStreams
work?
Seek for the file *.out
, not the *.log
. The expected output is:
2020-07-24 16:18:21,083 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
3> Apache
4> Flink
2020-07-24 16:18:21,126 INFO org.apache.flink.runtime.taskmanager.Task
Use out.collect()
on flatMap2
, or print()
won't work in this case. Here is the working version, which produces the output above. The first stream "Apache", "DROP", "Flink", "IGNORE"
is consumed and filtered by the shared variable blocked
. The second stream "DROP", "IGNORE"
modified the shared variable blocked
.
package org.sense.flink.examples.stream.tests;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class ConnectedStreamTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
private static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
}