Search code examples
javaapache-flinkflink-streaming

Flink how to compute over aggregated output of a keyed window


Is it possible in Flink to compute over aggregated output of a keyed window?

We have a Datastream, we call byKey() specifying a field that is composed by a char and a number (for example A01, A02... A10, B01, B02, ... B10, etc), like the squares of the chessboard. After the byKey() we call window(TumblingEventTimeWindow.of(Time.days(7)), so we create a weekly window. After this, we call reduce() and as result we obtain SingleOutputStreamOperator<Result>.

Now, we want to group the SingleOutputStreamOperator<Result> based on a field of each Result object and iterate over each group to extract the top3 based on a field in the Result objects in that group, is it possible to do this without creating another weekly window and having to perform an aggregation function on it? Obviously this works, however I don't like the thought of having this second weekly window after another weekly window. I would like to be able to merge all the SingleOutputStreamOperator<Result>of the first window and execute a function on them without having to use a new window that receives all the elements together.

This is my code, as you can see:

  1. We use keyBy() based on a Tuple2<String, Integer> based on fields of the object Query2IntermediateOutcome. The String in the tuple is the code A01,...,A10 which I had mentioned before.

  2. The code window(timeIntervalConstructor.newInstance()) basically creates a weekly window.

  3. We call reduce() so for each key we have an aggregated value.

  4. Now we use another keyBy(), this time the key is basically computed looking at the number of the code A01,...,A10: if it's greater than 5 we have a sea type, if it's less or equal we have another.

  5. Again, window(timeIntervalConstructor.newInstance()) for the second weekly window.

  6. Finally, in the aggregate() we compute the top3 for each group.

            .keyBy(new KeySelector<Query2IntermediateOutcome, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> getKey(Query2IntermediateOutcome intermediateOutcome) throws Exception {
                    return new Tuple2<String, Integer>(intermediateOutcome.getCellId(), intermediateOutcome.getHourInDate());
                }
            })
            .window(timeIntervalConstructor.newInstance())
            .reduce(new ReduceFunction<Query2IntermediateOutcome>() {
                @Override
                public Query2IntermediateOutcome reduce(Query2IntermediateOutcome t1, Query2IntermediateOutcome t2) throws Exception {
                    t1.setAttendance(t1.getAttendance()+t2.getAttendance());
                    return t1;
                }
            })
            .keyBy(new KeySelector<Query2IntermediateOutcome, String>() {
                @Override
                public String getKey(Query2IntermediateOutcome query2IntermediateOutcome) throws Exception {
                    return query2IntermediateOutcome.getSeaType().toString();
                }
            })
            .window(timeIntervalConstructor.newInstance())
            .aggregate(new Query2FinalAggregator(), new Query2Window())
    

This solution works, but I don't really like it because the second window receive all the data when the previous fires, but it happens weekly, so the second window receive all the data together and must immediately run the aggregate().


Solution

  • I think it would be reasonably straightforward to collapse all of this business logic into one KeyedProcessFunction. Then you could avoid the burst of activity at the end of the week.

    Take a look at this tutorial in the Flink docs for an example of how to replace a keyed window with a KeyedProcessFunction.