I need to have aggregation window in flink. I can't use aggregate function. since the getResult calculation requires me state access. So I tried to use aggregate with process :
.aggregate(
new AggregateFunction<Entry, Double, Double>() {
...........
};
, new ProcessWindowFunction<Double,Result,Entry,TimeWindow>() {
@Override
public void process(Entry item,
ProcessWindowFunction<Double, Result, Entry, TimeWindow>.Context ctx,
Iterable<Double> aggInput, Collector<Result> output) throws Exception {
}
}
But still , I need just the latest value of the aggInput , and I don't see a reason to save thos in memory, in my app it can be milliards of data. so I thought if I can purge each time the window data.
When you combine an AggregateFunction
with a ProcessWindowFunction
, only the latest value of the aggregate is kept in the window state, and the Iterable sent to the process method will only contain that one pre-aggregated entry. (A ReduceFunction
will work the same way.)
In other words, the optimization you are looking for is already there. The documentation has a bit more detail.