Why does this code not give out anything?
If i change to TumblingProcessingTimeWindows
- all work fine.
I did not find in the documentation what else I MUST add? Triggers? Evictors? Allowed Lateness?
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> Timestamp.valueOf(i.dt).getTime());
ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
return acc;
}).addSink(new PrintSinkFunction());
Input:
{"userId":1,"count":11,"dt":"2023-04-11T09:29:12.244"}
System time = Input time
Update 2:
I added some print information to withTimestampAssigner
- its called on every event.
I added OutputTag for catch dropped events - its clear.
OutputTag lateTag = new OutputTag("late"){};
I added debug print internal to reduce function - its called on every event.
But print (sink) for close output window there is not =(
All code:
private static void m4(DataStream<UserModel> ds) {
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
System.out.println(i.dt + " is: " + time + " dont know: " + timestamp);
return time;
});
OutputTag<UserModel> lateTag = new OutputTag<UserModel>("late"){};
SingleOutputStreamOperator<UserModel> reduce = ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.sideOutputLateData(lateTag)
.reduce((acc, i) -> {
System.out.println(i.dt + " reDUCE:");
acc.count += i.count;
acc.dt = i.dt;
return acc;
});
reduce.getSideOutput(lateTag).print();
reduce.addSink(new PrintSinkFunction());
}
UPDATE 3:
Reply to @kkrugler.
It's interesting that TumblingEventTimeWindows
needs ProcessAllWindowFunction
, although TumblingProcessingTimeWindows
works without it.
So I added ProcessAllWindowFunction
, but again no result, I added print to debug and this part of the code is not called.
Interesting point if i change to TumblingProcessingTimeWindows
, window close and sink EVEN without ProcessAllWindowFunction
All code:
public class Rich extends ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<UserModel, UserModelEx, TimeWindow>.Context context, Iterable<UserModel> iterable, Collector<UserModelEx> collector) throws Exception {
UserModel um = iterable.iterator().next();
System.out.println(um.count + " rich:" + um.dt);
collector.collect(new UserModelEx() {{
userId = um.userId;
count = um.count;
wStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getStart()), ZoneOffset.UTC);
wEnd = LocalDateTime.ofInstant(Instant.ofEpochMilli(context.window().getEnd()), ZoneOffset.UTC);
}});
}
}
private static void m4(DataStream<UserModel> ds) {
WatermarkStrategy<UserModel> strategy = WatermarkStrategy.<UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((i, timestamp) -> {
long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
System.out.println(i.dt + " assignEvent: " + time + " : " + timestamp);
return time;
});
SingleOutputStreamOperator<UserModelEx> reduce = ds.assignTimestampsAndWatermarks(strategy)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((acc, i) -> {
acc.count += i.count;
acc.dt = i.dt;
System.out.println(acc.dt + " reduce:" + acc.count);
return acc;
}, new Rich());
reduce.print();
//reduce.addSink(new PrintSinkFunction<UserModelEx>());
}
So after 4 days I found the problem, but I can't explain why it's necessary.
env.setParallelism(1);
- solves my problem.
I am reading from Kafka from topic partition = 1, i.e. by default the parallelism should be 1.
I hope the experts will explain why my example needs this parameter...