Inside Flink task instance I need to access remote web service to get some data when the event coming ,however I don't want to access remote web service every time when event coming, so I need to cache the data in local memory and can be accessed by all task of the process , how to do it ? storing the data in the static private variable at the class level ?
Such as the following example ,if set the local variable localCache at class Splitter, it cached at operator level instead of process level .
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
***private object localCache ;***
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
Exactly like you said. You'd use a static variable in a RichFlatMapFunction
and initialize it in open
. open
will be called on each TaskManager before feeding in any record. Note that there is an instance of Splitter being created for each different slot, so in most cases there are several Splitter instances on one TaskManager. Thus, you need to guard against double creation.
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private transient Object localCache;
@Override
public void open(Configuration parameters) throws Exception {
if (localCache == null)
localCache = ... ;
}
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}