My flink pipeline looks something like below
FlinkKafkaConsumerBase kafkaConsumer = new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),props);
kafkaSource = env.addSource(kafkaConsumer).filter(<>);
WatermarkStrategy<GenericMetricV2> watermarkStrategy = WatermarkStrategy
.withTimestampAssigner((metric, timestamp) -> {"ETS: mts: {}, ts: {}", metric.metricPoint.timeInstant, timestamp);
return metric.metricPoint.timeInstant;
metricStream = kafkasource
.transform("debugFilter", TypeInformation.of(<>), new StreamWatermarkDebugFilter<>("Op"))
.aggregate(AggregateFunction, ProcessWindowFunction)
I am running with parallelism 1 and default setAutowatermarkInterval
of 200 ms. I did not set setStreamTimeCharacteristic
as from flink 1.12 by default it is event time.
I am seeing that watermark is progressing from the output of StreamWatermarkDebugFilter but all the events are getting marked as late and is getting gathered at lateOutputTag
2021-05-18 17:14:19,745 INFO - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,745 INFO - ETS: mts: 1621310100000, ts: 1621310582271
2021-05-18 17:14:19,842 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621309499999
2021-05-18 17:14:19,944 INFO - ETS: mts: 1621309800000, ts: 1621310582275
2021-05-18 17:14:19,944 INFO - ETS: mts: 1621309800000, ts: 1621310582275
2021-05-18 17:14:20,107 INFO - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,107 INFO - ETS: mts: 1621310380000, ts: 1621310582278
2021-05-18 17:14:20,137 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621309779999
2021-05-18 17:14:20,203 INFO - ETS: mts: 1621309800000, ts: 1621310582279
2021-05-18 17:17:47,839 INFO - ETS: mts: 1621310100000, ts: 1621310681159
2021-05-18 17:17:47,848 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621310099999
2021-05-18 17:17:47,958 INFO - ETS: mts: 1621309800000, ts: 1621310681237
2021-05-18 17:17:47,958 INFO - ETS: mts: 1621309800000, ts: 1621310681237
2021-05-18 17:22:24,207 INFO - ETS: mts: 1621310100000, ts: 1621310703622
2021-05-18 17:22:24,229 INFO StreamWatermarkDebugFilter - Op, Watermark: 1621310399999
2021-05-18 17:22:24,315 INFO - ETS: mts: 1621309800000, ts: 1621310705177
2021-05-18 17:22:24,315 INFO - ETS: mts: 1621309800000, ts: 1621310705177
I have seen this discussion and it is not an idleness problem.
It looks like related to this discussion. Can someone suggest how can I debug this problem further to identify what could be the problem?
It was a problem on the part of code I did not share. I was doing a filter()
after assignTimestampsAndWatermarks()
so the skewed data which I was not interested in were pushing the watermark forward. I moved the filter()
before assignTimestampsAndWatermarks
and it is working as expected.