Search code examples
apache-flinkflink-streamingflink-sql

Flink dont close window with EventTimeWindows


Why does this code not give out anything? If i change to TumblingProcessingTimeWindows - all work fine.

I did not find in the documentation what else I MUST add? Triggers? Evictors? Allowed Lateness?

  WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> Timestamp.valueOf(i.dt).getTime());

        ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((acc, i) -> {
                    acc.count += i.count;
                    acc.dt = i.dt;
                    return acc;
                }).addSink(new PrintSinkFunction());

Input:

{"userId":1,"count":11,"dt":"2023-04-11T09:29:12.244"}

System time = Input time

Update 2:

  1. I added some print information to withTimestampAssigner - its called on every event.

  2. I added OutputTag for catch dropped events - its clear.

    OutputTag lateTag = new OutputTag("late"){};

  3. I added debug print internal to reduce function - its called on every event.

But print (sink) for close output window there is not =(

All code:

 private static void m4(DataStream<UserModel> ds) {
        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> {
                    long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    System.out.println(i.dt + " is: " + time +  " dont know: " + timestamp);
                    return time;
                });

        OutputTag<UserModel> lateTag = new OutputTag<UserModel>("late"){};

        SingleOutputStreamOperator<UserModel> reduce = ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .sideOutputLateData(lateTag)
                .reduce((acc, i) -> {
                    System.out.println(i.dt + " reDUCE:");
                    acc.count += i.count;
                    acc.dt = i.dt;
                    return acc;
                });
                reduce.getSideOutput(lateTag).print();

                reduce.addSink(new PrintSinkFunction());


    }

UPDATE 3:

Reply to @kkrugler. It's interesting that TumblingEventTimeWindows needs ProcessAllWindowFunction, although TumblingProcessingTimeWindows works without it.

So I added ProcessAllWindowFunction, but again no result, I added print to debug and this part of the code is not called.

Interesting point if i change to TumblingProcessingTimeWindows, window close and sink EVEN without ProcessAllWindowFunction enter image description here

All code:

public class Rich extends ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow> {
    @Override
    public void process(ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow>.Context context, Iterable<UserModel> iterable, Collector<UserModelEx> collector) throws Exception {
        UserModel um = iterable.iterator().next();
        System.out.println(um.count + " rich:" + um.dt);
        collector.collect(new UserModelEx() {{
            userId = um.userId;
            count = um.count;
            wStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getStart()), ZoneOffset.UTC);
            wEnd = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getEnd()), ZoneOffset.UTC);
        }});
    }
}

 private static void m4(DataStream<UserModel> ds) {

        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((i, timestamp) -> {
                    long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    System.out.println(i.dt + " assignEvent: " + time + " : " + timestamp);
                    return time;
                });


        SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce((acc, i) -> {
                    acc.count += i.count;
                    acc.dt = i.dt;
                    System.out.println(acc.dt + " reduce:" + acc.count);
                    return acc;
                }, new Rich());

        reduce.print();

        //reduce.addSink(new PrintSinkFunction<UserModelEx>());


    }

Solution

  • So after 4 days I found the problem, but I can't explain why it's necessary.

    env.setParallelism(1); - solves my problem.

    I am reading from Kafka from topic partition = 1, i.e. by default the parallelism should be 1.

    I hope the experts will explain why my example needs this parameter...