I'm calculating a simple mean on some records, using different windows sizes. Using 1 hour and 1 week windows there are no problems, and the results are computed correctly.
var keyed = src
.filter(event -> event.getSensor_id() < 10000)
.keyBy(Event::getSensor_id);
var hourResult = keyed
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new AvgQ1(Config.HOUR))
.setParallelism(5);
var weekResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(7)))
.aggregate(new AvgQ1(Config.WEEK))
.setParallelism(5);
Instead, using a window of 1 month (31 days), the window is splitted in half, and flink gave as output two results, one for records from 05-1 to 05-14 and one for records from 05-15 to 05-31.
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(31)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Using a window of size 30 days, the result is instead splitted into (05-1;05-27) and (05-28;05-31).
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(30)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
This is the AggregateFunction
.
public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> {
String windowType;
public AvgQ1(String windowType) {
this.windowType = windowType;
}
public AccumulatorQ1 createAccumulator() {
return new AccumulatorQ1();
}
@Override
public AccumulatorQ1 add(Event values, AccumulatorQ1 acc) {
acc.sum += values.getTemperature();
acc.sensor_id = values.getSensor_id();
acc.last_timestamp = values.getTimestamp();
acc.count++;
return acc;
}
@Override
public AccumulatorQ1 merge(AccumulatorQ1 a, AccumulatorQ1 b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
@Override
public OutQ1 getResult(AccumulatorQ1 acc) {
double mean = acc.sum / (double) acc.count;
OutQ1 result = new OutQ1(windowType);
result.setSensor_id(acc.sensor_id);
result.setTemperature(mean);
result.setOccurrences(acc.count);
if (windowType.equals(Config.HOUR)) {
result.setTimestamp(Tools.getHourSlot(acc.last_timestamp));
}
if (windowType.equals(Config.WEEK)) {
result.setTimestamp(Tools.getWeekSlot(acc.last_timestamp));
}
if (windowType.equals(Config.MONTH)) {
result.setTimestamp(Tools.getMonthSlot(acc.last_timestamp));
}
return result;
}
}
I think that the problem is somehow related to memory usage, as if the accumulator or the window couldn't hold too much data. So i tried to monitor jvm heap usage in WebUI, but it does not crosses the limit, and also change the backend state from hash to rockdb.
I'm using Flink on docker, reading DataStream from a kafka topic, any idea?
Thanks to @david-anderson suggestion, i solved the problem. Using a 31 days window for a dataset with values for May 2022, the flink window was starting from
14-04-2022
to 15-05-2022
, instead of 01-05-2022
to 31-05-2022
. This is because (as @david-anderson said):
Flink's time-based window divides the time since the Unix epoch (01-01-1970).
So my solution is based on delaying the start of the window, by applying an offset of 17days (from 14-04 to 01-05) in the window()
operator.
var monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(31),Time.days(17)))
.aggregate(new AvgQ1(Config.MONTH))
.name("Monthly Window Mean AggregateFunction");
The offset depends on the specific month, so i used a ProcessWindowFunction
just to print start and end of the windows, and then see the difference in days between the window used by flink and the desired window.
public class DebugProcess extends ProcessWindowFunction<Event, Tuple2<Long,Integer>, Long, TimeWindow> {
@Override
public void process(Long sensor_id, ProcessWindowFunction<Event, Tuple2<Long,Integer>, Long, TimeWindow>.Context context, Iterable<Event> iterable, Collector<Tuple2<Long,Integer>> collector) throws Exception {
Timestamp end = new Timestamp(context.window().getEnd());
Timestamp start = new Timestamp(context.window().getStart());
System.out.printf("WINDOW: (%s,%s)\n", start,end);
...
}
Analyzing more accurately my results, i noticed that there were problems also for one-week query; in particular, the window was starting from 28-04-2022
to 05-05-2022
instead of 01-05-2022
to 08-05-2022
. So, also there i applied an offset of 3 days:
var weekResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(7),Time.days(3)))
.aggregate(new AvgQ1(Config.WEEK))
.name("Weekly Window Mean AggregateFunction");