apache-flinkflink-streamingFlink interval join DataStream with KafkaSource drops all records
Current Configuration
- Application runs on Flink 1.14.4
- Within the application, a data stream(result of a long chain of operators) interval joins with a Kafka source.
- Event timestamps per partition are strictly ascending.
- The per-partition watermark strategy on Kafka sets the watermark to the maximum timestamp it has seen so far(bounded-out-of-orderness=1 sec).
- Kafka source(right side of the interval join) by default is around a minute ahead of the other data stream (kafka > datastream)
- Kafka source generates data every 1 minute thus its partitions can be seen as idle for at most 59 seconds. (withIdleness option is not used)
Problem
- This setup worked fine until recently where I replaced the deprecated KafkaConsumer class with the new KafkaSource class.
- The output of the interval join operator should be(and was) the minimum of two incoming streams(Kafka and DS) but with KafkaSource it is set to the maximum after a certain period of time. Thus, all records from the data stream are dropped because its timestamp is behind the watermark(=kafka source).
Question
- Does KafkaSource behave differently on window join when stream speeds are not in sync?
- Can watermark of a window join operator be the maximum of the two incoming streams?

Solution