Search code examples
apache-flinkflink-streaming

How can I do multiple window aggregations in apache flink using KeyedProcessFunction implementation?


I want to extend my lower window aggregations to compute higher window aggregations.

My lower window aggregation is using the KeyedProcessFunction, and onTimer is implemented so as to flush data into sink at end of window.

My code is similar to what has been explained here : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/event_driven/. But I need to compute aggregations over higher windows as well,

Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
 ... ->window(1 day) -> out_1 -> out_2 -> out_3 ... ->out_n

How do I extend the lower window results to calculate the higher window aggregations?


Solution

  • You can take the output from a lower layer of windowing and send it into a higher one, like this, for example:

    val hourlyTips = fares
        .map((f: TaxiFare) => (f.driverId, f.tip))
        .keyBy(_._1)
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .reduce((f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) })
    
    val dailyTips = hourlyTips
        .keyBy(_._1)
        .window(TumblingEventTimeWindows.of(Time.hours(24)))
        .reduce((f1: (Long, Float), f2: (Long, Float)) => { (f1._1, f1._2 + f2._2) })
    
    hourlyTips.addSink(...)
    dailyTips.addSink(...)
    

    This basic approach should work whether the windowing is implemented using DataStream windows, or with windows you implement yourself using a KeyedProcessFunction.