Search code examples
javaapache-flinkstreamingflink-streamingwatermark

Flink - How many watermarks get injected in this bounded stream?


In the below code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream = env.fromElements(
            Tuple2.of("01", 1),
            Tuple2.of("02", 2),
            Tuple2.of("03", 3),
            Tuple2.of("04", 4),
            Tuple2.of("05", 5)
            ).assignTimestampsAndWatermarks(
                                WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
                                        .withTimestampAssigner(
                                                (tuple,  ts) -> System.currentTimeMillis()));
  env.execute("dfjghf")

 env.execute("gghh");

flink - 1.17.1, Java 11


How many watermarks will be injected in this bounded stream? Is it 5?

2) How to print those watermark elements? Just for debugging...


Solution

    1. forMonotonousTimestamps is a periodic watermark generator, emitting watermarks every 200 msec (by default). This job isn't going to run long enough for that to ever happen. So I would expect just one watermark, at the end of the job. (See this answer for more info about the end-of-job watermark.)

    2. Watermarks can be inspected in the Flink web UI -- that's the easiest way to debug them. Otherwise you can access the current watermark through the context passed into the processElement and onTimer methods of a keyed process function.