Search code examples
apache-flinkflink-streaming

Flink interval join DataStream with KafkaSource drops all records


Current Configuration

  • Application runs on Flink 1.14.4
  • Within the application, a data stream(result of a long chain of operators) interval joins with a Kafka source.
  • Event timestamps per partition are strictly ascending.
  • The per-partition watermark strategy on Kafka sets the watermark to the maximum timestamp it has seen so far(bounded-out-of-orderness=1 sec).
  • Kafka source(right side of the interval join) by default is around a minute ahead of the other data stream (kafka > datastream)
  • Kafka source generates data every 1 minute thus its partitions can be seen as idle for at most 59 seconds. (withIdleness option is not used)

Problem

  • This setup worked fine until recently where I replaced the deprecated KafkaConsumer class with the new KafkaSource class.
  • The output of the interval join operator should be(and was) the minimum of two incoming streams(Kafka and DS) but with KafkaSource it is set to the maximum after a certain period of time. Thus, all records from the data stream are dropped because its timestamp is behind the watermark(=kafka source).

Question

  • Does KafkaSource behave differently on window join when stream speeds are not in sync?
  • Can watermark of a window join operator be the maximum of the two incoming streams?

enter image description here


Solution

  • this issue is fixed in https://issues.apache.org/jira/browse/FLINK-28975. Using KafkaSource with WatermarkWithIdlness has an issue with marking the source as idle status.