Search code examples
javaapache-kafkaapache-flinkflink-streaming

Apache flink understanding of watermark idleness and relation to Bounded duration and window duration


I have a Flink pipeline configured with a Kafka connector.

I have set a watermark generation frequency set to 2 seconds using:

env.getConfig().setAutoWatermarkInterval(2000);

Now my tumbling window is of 60 seconds for the stream window where we do some aggregations and we have event time-based processing based on the timestamp of one of our data fields.

I have not configured allowedLateness for my watermark strategy or for my stream.

final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
        topicConfig.name(),
        AvroDeserializationSchema.forSpecific(MyPojo.class),
        topicConfig.forConsumer()
);
myEvents.setStartFromLatest();



myEvents.assignTimestampsAndWatermarks(
    WatermarkStrategy
        .<MyPojo>forBoundedOutOfOrderness(
                Duration.ofSeconds(30))
        .withIdleness(Duration.ofSeconds(120))
        .withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));

Q.1 From what I am reading, the window for my time 0-60 will be computed after 90 seconds and 30-90 at 120 seconds and so on. However since we are doing tumbling window i.e no overlaps, my guess is there is no 30-90 window, the next window after 0-60 is 60-120 that gets triggered at 150 second mark, am I right?

Q.2 Without allowedLateness all late data will be discarded eg. A event with timestamp of 45 that arrives after 90 seconds is considered out of order and will be out of the first window i.e 0-60.For window 60-120, the event timestamp does not match so it will be discarded and not included in the window fired at 150 second mark, am I right?

Q.3. For the source idleness duration, I choose 120 saying that if any Kakfa partition for the topic is inactive with data, then mark it as idle after 2 minutes and then send the watermark for other active partitions. My question was on selection of this number i.e 2 minutes and if it has anything to do with the window duration (60 seconds) or the out of orderness(30 seconds). If so what should I be keeping in mind here for an apt selection such that I won't have data stranded late due to non-advancing watermarks due to idle partitions?

Or is that 120 is too long a wait that I could potentially miss data and hence I should be setting this to something much less than the OutOfOrderness duration to ensure 0 data loss?

EDIT: Added some more code


Solution

  • Q1: Yes, that's correct.

    Q2: Yes, that's also correct.

    Q3: The details here depend on whether you are having the Kafka source apply the WatermarkStrategy, in which case it will do per-partition watermarking, or whether the WatermarkStrategy is deployed as a separate operator somewhere after (typically chained immediately after) the source operator.

    In the first case (with per-partition watermarking done by the FlinkKafkaConsumer) you will do something like this:

    FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>(...);
    
    kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy ...);
    
    DataStream<MyType> stream = env.addSource(kafkaSource);
    

    whereas doing the watermarking separately, after the source, looks like this:

    DataStream<MyType> events = env.addSource(...);
    
    DataStream<MyType> timestampedEvents = events
      .assignTimestampsAndWatermarks(
          WatermarkStrategy
            .<MyType>forBoundedOutOfOrderness(Duration ...)
            .withTimestampAssigner((event, timestamp) -> event.timestamp));
    

    When the watermarking is done on a per-partition basis, then a single idle partition will hold back the watermark for the consumer/source instance handling that partition -- until the idleness timeout kicks in (120 seconds in your example). By contrast, if the watermarking is done in a separate operator chained to the source, then only if all of the partitions assigned to that source instance (the one with an idle partition) are idle will the watermarks be held up (again, for 120 seconds).

    But regardless of those details, there will hopefully be no data loss. There will be a period of time during which windows are not triggered (because the watermarks are not advancing), but events will continue to be processed and assigned to their appropriate windows. Once watermarks resume, those windows will close and deliver their results.

    The situation in which data loss will occur is if the partition was idle because some failure upstream caused a disruption that eventually produces a bunch of late events. After the idleness timeout expires, the watermark will advance, and if the source was idle because something was broken upstream (rather than because there simply were no events), those events that eventually arrive will be late (unless your bounded-out-of-orderness delay is large enough to accommodate them). If you choose to ignore late events, then those events will be lost.