Search code examples
apache-flinkflink-streaming

Apache Flink: How to apply multiple counting window functions?


I have a stream of data that is keyed and need to compute counts for tumbled of different time periods (1 minute, 5 minutes, 1 day, 1 week).

Is it possible to compute all four window counts in a single application?


Solution

  • Yes, that's possible.

    If you are using event-time, you can simply cascade the windows with increasing time intervals. So you do:

    DataStream<String> data = ...
    // append a Long 1 to each record to count it.
    DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); 
    
    DataStream<Tuple2<String, Long>> 1minCnts = withOnes
      // key by String field
      .keyBy(0) 
      // define time window
      .timeWindow(Time.of(1, MINUTES))
      // sum ones of the Long field
      // in practice you want to use an incrementally aggregating ReduceFunction and 
      // a WindowFunction to extract the start/end timestamp of the window
      .sum(1);
    
    // emit 1-min counts to wherever you need it
    1minCnts.addSink(new YourSink());
    
    // compute 5-min counts based on 1-min counts
    DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
      // key by String field
      .keyBy(0)
      // define time window of 5 minutes
      .timeWindow(Time.of(5, MINUTES))
      // sum the 1-minute counts in the Long field
      .sum(1);
    
    // emit 5-min counts to wherever you need it
    5minCnts.addSink(new YourSink());
    
    // continue with 1 day window and 1 week window
    

    Note that this is possible, because:

    1. Sum is an associative function (you can compute a sum by summing partial sums).
    2. The tumbling windows are nicely aligned and do not overlap.

    Regarding the comment on the incrementally aggregating ReduceFunction:

    Usually, you want to have the start and/or end timestamp of the window in the output of a window operation (otherwise all results for the same key look the same). The start and end time of a window can be accessed from the window parameter of the apply() method of a WindowFunction. However, a WindowFunction does not incrementally aggregate records but collects them and aggregates the records at the end of the window. Hence, it is more efficient to use a ReduceFunction for incremental aggregation and a WindowFunction to append the start and/or end time of the window to the result. The documentation discusses the details.

    If you want to compute this using processing time, you cannot cascade the windows but have to fan out from the input data stream to four window functions.