Search code examples
apache-flink

Apache Flink Tumbling Window time offset with Table API or SQL


Anyone knows how to do tumbling window with time offset - window size is one day and the time offset is in hours based on the time zone.

I found examples of doing it with the DataStream API, wondering how to achieve it with Table API/SQL.

Below is my code using DataStream API.

DataStream<Tuple2<String, Timestamp>> inputStreamWithTime = inputStream
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Timestamp>>() {
    @Override
    public long extractTimestamp(Tuple2<String, Timestamp> element, long previousElementTimestamp) {
        return element.f1.getTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<String, Timestamp> lastElement, long extractedTimestamp) {
        return new Watermark(extractedTimestamp);
    }
});

inputStreamWithTime
.keyBy(new KeySelector<Tuple2<String,Timestamp>, String>() {
    @Override
    public String getKey(Tuple2<String, Timestamp> in) throws Exception {
        return in.f0;
    }
})
.window(TumblingEventTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
.aggregate(new CountAggregate(), new ProcessTumblingWindowFunction())
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
    return new Tuple3<String, Long, Timestamp, Timestamp>(value.f0, value.f1, value.f2);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP))
.addSink(getSink());

Thanks in advance.


Solution

  • Unfortunately, there is no way to perform that window in Table API/SQL. Time windows are always defined in UTC at that API level for now.

    A possible workaround would be to shift the time in the source connector, such that the UTC window is providing the correct result. But then you need to counter-shift it in the sink connector. Of course, this hack will only work if you don't consume the source in another application.