Search code examples
apache-flinkflink-streaming

Flink: ValueState on RichFlatMapFunktion always returns null


I try to calculate the highest amount of found hashtags in a given Tumbling window.

For this I do kind of a "word count" for hashtags and sum them up. This works fine. After this, I try to find the hashtag with the highest order in the given window. I use a RichFlatMapFunction for this and ValueState to save the current maximum of the appearance of a single hashtag, but this doesn't work. I have debugged my code and find out that the value of the ValueState "maxVal" is in every flatMap step "null". So the update() and the value() method doesn't work in my scenario.

Do I misunderstood the RichFlatMap function or their usage?

Here is my code, everything except the last flatmap function is working as expected:

public class TwitterHashtagCount {

public static void main(String args[]) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
    DataStream<String> tweetsRaw = env.addSource(new TwitterSource(TwitterConnection.getTwitterConnectionProperties()));

    DataStream<String> tweetsGerman = tweetsRaw.filter(new EnglishLangFilter());

    DataStream<Tuple2<String, Integer>> tweetHashtagCount = tweetsGerman
            .flatMap(new TwitterHashtagFlatMap())
            .keyBy(0)
            .timeWindow(Time.seconds(15))
            .sum(1)
            .flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

                private transient ValueState<Integer> maxVal;

                @Override
                public void open(Configuration parameters) throws Exception {
                    ValueStateDescriptor<Integer> descriptor =
                            new ValueStateDescriptor<>(
                                    // state name
                                    "max-val",
                                    // type information of state
                                    TypeInformation.of(Integer.class));
                    maxVal = getRuntimeContext().getState(descriptor);
                }

                @Override
                public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    Integer maxCount = maxVal.value();
                    if(maxCount == null) {
                        maxCount = 0;
                        maxVal.update(0);
                    }

                    if(value.f1 > maxCount) {
                        maxVal.update(maxCount);
                        out.collect(new Tuple2<String, Integer>(value.f0, value.f1));
                    }
                }
            });

    tweetHashtagCount.print();


    env.execute("Twitter Streaming WordCount");
}

}

Solution

  • I'm wondering why the code you've shared runs at all. The result of sum(1) is non-keyed stream, and the managed state interface you are using expects a keyed stream, and will keep a separate instance of the state for each key. I'm surprised you're not getting an error saying "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation."

    Since you've previously windowed the stream, if you do key it again (with the same key) before the RichFlatMapFunction, each key will occur once and the maxVal will always be null.

    Something like this might do what you intend, if your goal is to find the max across all hashtags in each time window:

    tweetsGerman
        .flatMap(new TwitterHashtagFlatMap())
        .keyBy(0)
        .timeWindow(Time.seconds(15))
        .sum(1)
        .timeWindowAll(Time.seconds(15))
        .max(1)