I have a flink job that process Metric(name, type, timestamp, value)
Object. Metrics are keyby (name, type, timestamp). I am trying to process metrics with specific timestamp
starting timestamp + 50 second
. Every timestamp has interval of 10 second. I am currently trying window(SlidingEventTimeWindows.of(Time.seconds(50), Time.seconds(10)))
with a ProcessWindowFunction
with
@Override
public void process(Tuple3<String, Integer, Long> key, Context context, Iterable<Metric> input, Collector<Metric> collector) {
long windowStartTime = context.window().getStart();
long timestamp = key.f2;
if (windowStartTime <= timestamp < windowStartTime + 10second) {
collector.out(input.iterator().next()). //to some reducer
}
However, I can only get first wave of output and stop receiving things after. I also tried adding a isProcessed
field in Metric and marked in the reducer function and apply a Evictor but doesn't seem to work.
The source and sink are kafka consumer and producer. I also have watermark setup
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.seconds(50)) {
@Override
public long extractTimestamp(Metric metrics) {
return metrics.getTimestamp() * 1000; // to millisecond
}
})
The reason why you are not getting more events in each window is that you have included the timestamp in the key. This has the effect of forcing each window to only include events that all have the same timestamp.