Search code examples
javaapache-flinkflink-streamingamazon-kinesis-analytics

Why does my watermark not advance in my Apache Flink keyed stream?


I am currently using Apache Flink 1.13.2 with Java for my streaming application. I am using a keyed function with no window function. I have implemented a watermark strategy and autoWatermarkInterval config per the documentation, although my watermark is not advancing.

I have double-checked this by using the Flink web UI and printing the current watermark in my EventProcessor KeyedProcessFunction but the watermark is constantly set to a very large negative number -9223372036854775808 (lowest possible watermark).

env.getConfig().setAutoWatermarkInterval(1000);

WatermarkStrategy<EventPayload> watermarkStrategy = WatermarkStrategy
        .<EventPayload>forMonotonousTimestamps()
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

DataStream<EventPayload> deserialized = input
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .flatMap(new Deserializer());

DataStream<EnrichedEventPayload> resultStream =
        AsyncDataStream.orderedWait(deserialized, new Enrichment(), 5, TimeUnit.SECONDS, 100);

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .process(new EventProcessor());

I even tried to add the WatermarkStrategy to the stream where it is using keyBy (and adjusting the types to match) but still no luck.

DataStream<Session> eventsStream = resultStream
        .filter(EnrichedEventPayload::getIsEnriched)
        .keyBy(EnrichedEventPayload::getId)
        .assignTimestampsAndWatermarks(watermarkStrategy)
        .process(new EventProcessor());

I have also tried using my own class implementing WatermarkStrategy and set breakpoints on the onEvent function to ensure the new watermark was being emitted, although it still did not advance (and any associated timers did not fire).

Any help would be greatly appreciated!


Solution

  • This will happen if one of the parallel instances of the watermark strategy is idle (i.e., if there are no events flowing through it). Using the withIdleness(...) option on the watermark strategy would be one way to solve this.