I am trying to understand the dependence between Windows and Watermark generation in Apache FLink, I have an error with the example below :
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(10000);
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("watermarkFlink", new SimpleStringSchema(), props);
DataStream<String> orderStream = env.addSource(kafkaSource);
DataStream<Order> dataStream = orderStream.map(str -> {
String[] order = str.split(",");
return new Order(order[0], Long.parseLong(order[1]), null);
});
WatermarkStrategy<Order> orderWatermarkStrategy = CustomWatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((element, timestamp) ->
element.getTimestamp()
);
dataStream
.assignTimestampsAndWatermarks(orderWatermarkStrategy)
.map(new OrderKeyValue())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> src) throws Exception {
return src.f0;
}
})
.window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))
.sum(1)
.print("Windows");
dataStream.print("Data");
env.execute();
}
public static class OrderKeyValue implements MapFunction<Order, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(Order order) {
return new Tuple2<>(order.getCategory(), 1);
}
}
The timestamp here is a long that we can retrieve from the Kafka source which should be like : A,4 C,8 where the C is the Category and 5 is the timestamp.
Whenever I send an event the datastream is printing but not these with the window (print("Windows")). Also if for example I receive an event A,12 and then I have a watermark generated (in 10 seconds) then I have C,2 which comes after the first windows being closed, will it be processed in the window or will it be just ignored ?
There's a tutorial in the Flink documentation that should help clarify these concepts: https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/streaming_analytics/
But to summarize the situation:
If you have an event stream like (A,4) (C,8) (A,12), then those integers will be interpreted as milliseconds.
You first window will wait for a watermark of 20000 before being triggered.
To generate a watermark that large, you'll need an event with a timestamp of at least 21000 (since the bounded out-of-orderness is set to 1 second).
And since you have configured the auto-watermarking interval to 10 seconds, your application will have to run that long before the first watermark will be generated. (I can't think of any situation where setting the watermarking interval this large is helpful.)
If an event arrives after its window has been closed, then it will be ignored (by default). You can configure allowed lateness to arrange for late events to trigger additional window firings.