Search code examples
apache-flinkflink-streaming

Flink Sliding count window behavior


Suppose we have such data structure:

Tuple2<ArryaList<Long>, Integer>

The first field is an ArrayList of length one containing a timestamp and the Integer field is a number between 1 and 40 named channel. The goal is to aggregate every 400 message with the same key (channel) and apply the ReduceFunction on them (It just merge the timestamps of the 400 messages in the first field of the tuple). I set the channel field as a key for the messages and create a Count Window of 400. For example, if we have 160000 message as input, it should output 160000/400 = 400 row and the Count window works as desired. The problem is when I use Sliding Count window, my expected behavior is :

Flink creates logical windows for every channel number and applies the ReduceFunction for the first time, if the logical window's length reaches to 400, after that every 100 input data, with the same key as the logical window's key, will call the ReduceFunction for last 400 message in the window, too, So we should have:

  • 160000 - 400 = 159600 // the first 400 input will call the reduce function for the first time
  • 159600 / 100 = 1596 // after the first 400 input, for every 100 input Flink call the reduce function for the last 400 input
  • 1 + 1596 = 1597 // The number of outputted row

But running Sliding Count window, it output 1600 rows which have a variable length. (I expected lengths of outputs to be just 400)

Point: Saying length I mean the size of ArrayList (The first field of the Tuple2)

  • The first 40 channel --> length of 100
  • The second 40 channel --> length of 299
  • The third 40 channel --> length of 598
  • The fourth 40 channel --> length of 997
  • The remains 40 channel --> length of 400

How can I justify such behavior and Implement my desired Sliding Count window?

Here is the source code:

DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
                 .reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
             @Override
             public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                 t0.f0.add(t1.f0.get(0));
                 return t0;
             }
         }).writeAsText("results400").setParallelism(1);

Update: According to @DavidAnderson suggestion, I also tried creating a new Tuple in the ReduceFunstion instead of modifying t0, But it resulted in the same output.

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                         ArrayList<Long> times = t0.f0;

                         times.addAll(t1.f0);

                         return new Tuple2<>(times, t0.f1) ;
                     }

Solution

  • This is the implementation of countWindow

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
        return window(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    }
    

    which does not behave in quite the way you expect. The window is triggered every 100 elements (the slide), whether or not it contains 400 elements (the size). The size is controlling how many elements to keep, at most.