Search code examples
apache-kafkabigdataapache-flinkflink-streamingwatermark

Does flink hold history of closed event time windows with watermark?


I have flink job that aggregates data using keyed tumbling windows with event time and watermark.

My question is does flink holds a state of his already closed windows? Otherwise, I have no other explanation why an event that belongs to a window that never opened before will open a window and not dropped it immediately.

given that our windows are for 1 hour and forBoundedOutOfOrderness is 10 minutes

Lets have an example :

event1 = ("2022-01-01T08:25:00Z") => window fired

event2 = ("2022-01-01T09:25:00Z") => window created but not fired as expected

event3 = ("2022-01-01T05:25:00Z") => will be aggregate with event 4 instead of dropped (why?)

event4 = ("2022-01-01T05:40:00Z") => will be aggregate with event 3 instead of dropped (why?)

    val stream = env
      .fromSource(
        kafkaSource,
        WatermarkStrategy
          .forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
          .withIdleness(Duration.ofSeconds(idleness))
          .withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
            override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
                : Long = {
              logger.info(
                LogMessage(
                  element._3.orgId,
                  s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
                  element._3.flowId
                )
              )
              element._3.updateTime.asEpoch
            }
          }),
        s"Source - $kConsumeTopic"
      )

    stream
      .keyBy(element => (element._2.orgId -> element._2.procUid))                                                                     
      .window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
      .reduce(new ReduceFunc)                                                                                         
      .name("Aggregated EnrichedProcess")
      .sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
      .name(s"Sink -> $kProduceTopic")

edited: The way I'm testing this out is Integration Tests with docker compose. I'm generating an event to Kafka => consumed by Flink job & sink to Kafka => checking the content of kafka.

When I put Sleep of 30 sec between sending the event3 and event4 are dropped. This is the behaviour I was expecting.

    val producer = new Producer(producerTopic)

    val consumer = new Consumer(consumerTopic, groupId)
    producer.send(event1)
    producer.send(event2)
    Thread.sleep(30000)
    producer.send(event3)
    Thread.sleep(30000)
    producer.send(event4)

    val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()

But even more curious now is why when I put Sleep of 10 sec instead of 30, I recieve only the first situation (The watermark was supposed to be updated already(defualt of periodic watermark generator is 200ms)


Solution

  • Executive summary:

    Non-determinism in event-time-based logic with Flink comes from mixing processing time with event time -- as happens with periodic watermark generators and idleness detection. Only if you never have late events or idle sources can you be sure of deterministic results.

    More details:

    While you would expect

    event3 = ("2022-01-01T05:25:00Z")
    

    to be late, it will only truly be late if a large enough watermark has managed to arrive first. With the forBoundedOutOfOrderness strategy there's no guarantee of that -- this is a periodic watermark generator that produces watermarks every 200 msec. So it could be that a watermark based on the timestamp of event2 is produced between event3 and event4.

    That's one possible explanation; there may be others depending on the exact circumstances. For example, with all that sleeping going on, one of the parallelism instances of the watermark generator is idle for at least a minute, which may be involved in producing the behavior being observed (depending on the value of idleness, etc).

    More background:

    With the parallelism being > 1, there are multiple, independent instances of the watermark strategy each doing their own thing based on the events they process.

    Operators with multiple input channels, such as the keyed window, will combine these watermarks by taking the minimum of the incoming watermarks (from all non-idle channels) as their own watermark.

    Answering the original question:

    Does Flink retain the state for windows that have already been closed? No. Once the allowed lateness (if any) has expired, the state for an event time window is purged.