Search code examples
javaapache-flinkflink-streaming

SlidingEventTimeWindows does not produce any output


I have a stream execution configured as

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Record> stream = env.addSource(new FlinkKafkaConsumer(
    SystemsCpu.TOPIC,
    ConfluentRegistryAvroDeserializationSchema.forGeneric(SystemsCpu.SCHEMA, registry),
    config)
    .setStartFromLatest());

DataStream<Anomaly> anomalies = stream
    .keyBy(x -> x.get("host").toString())
    .window(SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))) // produces output with TumblingEventTimeWindows
    .process(new AnomalyDetector())
    .name("anomaly-detector");

public class AnomalyDetector extends ProcessWindowFunction<Record, Anomaly, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Record> input, Collector<Anomaly> out) {
    var anomaly = new Anomaly();
    anomaly.setValue(1.0);
    out.collect(anomaly);
  }
}

However for some reason SlidingEventTimeWindows does not produce any output to be processed by the AnomalyDetector (i.e. process is not triggered at all). If I use, for example, TumblingEventTimeWindows it works as expected.

Any ideas what might be causing this? Am I using SlidingEventTimeWindows incorrectly?


Solution

  • When doing any sort of event time windowing it is necessary to provide a WatermarkStrategy. Watermarks mark a spot in the stream, and signal that the stream is complete up through some specific point in time. Event time windows can only be triggered by the arrival of a sufficiently large watermark.

    See the docs for details, but this could be something like this:

    DataStream<MyType> timestampedEvents = events
      .assignTimestampsAndWatermarks(
          WatermarkStrategy
            .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((event, timestamp) -> event.timestamp));
    

    However, since you are using Kafka, it's usually better to have the Flink Kafka consumer do the watermarking:

    FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
    
    kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy...);
    
    DataStream<MyType> stream = env.addSource(kafkaSource);
    

    Note that if you use this later approach, and if your events are in temporal order within each Kafka partition, you can take advantage of the per-parition watermarking that the Flink Kafka source provides, and use WatermarkStrategy.forMonotonousTimestamps() rather than the bounded-of-orderness strategy. This has a number of advantages.

    By the way, and this is unrelated to your question, but you should be aware that by specifying SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20)), every event will be copied into each of 60 overlapping windows.