Search code examples
javaapache-flink

Apache flink - usage of TumblingProcessingTimeWindow with TimeCharacteristic.EventTime


It looks like TumblingProcessingTimeWindow always uses "Ingestion time". Is there any way to force windowing on the event time?

My use-case is quite simple I recieve events that contain "event timestamp", and want them to be aggregated based on event time.

E.g. in following code I expect 2 outputs:

public class WindowExample {

private static final SimpleDateFormat FORMAT = new SimpleDateFormat("HH:mm:ss");

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStreamSource<Bean> beans = env.fromElements(
        new Bean(1, 1, "12:00:00"),
        new Bean(1, 2, "12:00:03"),
        new Bean(1, 1, "12:00:04"),  //window of 3 sec trigger here
        new Bean(1, 2, "12:00:05"),
        new Bean(1, 3, "12:00:06"),
        new Bean(1, 3, "12:00:07")   //window of 3 sec trigger here
    );

    beans.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Bean>() {
        @Override public long extractAscendingTimestamp(Bean element) {
            return element.getTs();
        }
    })
        .keyBy("id")
        .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
        .max("value")
        .addSink(new SinkFunction<Bean>() {

            @Override public void invoke(Bean value, Context context) {
                System.out.println("Sync on: "+value);
            }
        });
    env.execute("Windowing test");
}

public static class Bean {

    private int id;
    private int value;
    private long ts;

    public Bean() {
    }

    Bean(int id, int value, String time) throws ParseException {
        this.id = id;
        this.value = value;
        this.ts = FORMAT.parse(time).toInstant().toEpochMilli();
    }

    long getTs() {
        return ts;
    }
    // other getters and setters
}

}


Solution

  • Flink allows the use of processing time windows with event time streams, because there are legitimate use cases for that. But if you do want event time windowing, you need to ask for it. In this case you should be using TumblingEventTimeWindows.