Anyone knows how to do tumbling window with time offset - window size is one day and the time offset is in hours based on the time zone.
I found examples of doing it with the DataStream API, wondering how to achieve it with Table API/SQL.
Below is my code using DataStream API.
DataStream<Tuple2<String, Timestamp>> inputStreamWithTime = inputStream
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Timestamp>>() {
@Override
public long extractTimestamp(Tuple2<String, Timestamp> element, long previousElementTimestamp) {
return element.f1.getTime();
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Timestamp> lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
});
inputStreamWithTime
.keyBy(new KeySelector<Tuple2<String,Timestamp>, String>() {
@Override
public String getKey(Tuple2<String, Timestamp> in) throws Exception {
return in.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
.aggregate(new CountAggregate(), new ProcessTumblingWindowFunction())
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
return new Tuple3<String, Long, Timestamp, Timestamp>(value.f0, value.f1, value.f2);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP))
.addSink(getSink());
Thanks in advance.
Unfortunately, there is no way to perform that window in Table API/SQL. Time windows are always defined in UTC at that API level for now.
A possible workaround would be to shift the time in the source connector, such that the UTC window is providing the correct result. But then you need to counter-shift it in the sink connector. Of course, this hack will only work if you don't consume the source in another application.