Search code examples
apache-flinkflink-streaming

If I emit an event from an operator after holding it in state for certain duration will the downstream operator accept it if it is past the watermark?


I have a flink job which consists of two sources.

The two sources are keyed off a join key and a process function joins the two streams. Sometimes the data might be late by 15 minutes. So I hold the data in state for 15 minutes using timers and then emit the event after 15 minutes. In most of the cases I don't hold the event in state, it gets joined and emitted right away but in some cases it might stay in state for 15 minutes

Now the watermark from the sources keep on flowing through. My question if this event which gets joined and emitted after 15 minutes, will it be accepted by the downstream operator lets say sink.

I see that it is being accepted. If I look at the lower watermark of the sink, it is higher than the event time of the event emitted late(15 minutes). The reason for the watermark being higher is that even though some of the events got stuck in the state for 15 minutes, other events kept flowing through with higher timestamp. So the watermark kept on moving up.

I am just wondering why the sink is accepting this event which is past its watermark


Solution

  • If I emit an event from an operator after holding it in state for certain duration will the downstream operator accept it if it is past the watermark?

    That depends on the behavior of the downstream operator.

    Some operators pay attention to timestamps and watermarks, and some don't. The operators that are affected by watermarks include

    • windows
    • temporal and interval joins
    • CEP and match_recognize

    In some cases these operators will silently drop late events, and in some cases these operators offer the option of sending late events to a side output. DataStream windows can process late events and re-emit their results up until the late events exceed the "allowed lateness".

    The reason why these operators behave this way is that they are holding onto state that they must eventually clear, and they rely on watermarks to know when they should clear their state.

    Other operators, such as sinks, maps, and filters, don't care about timestamps and watermarks. These operators aren't in danger of accumulating too much state, and so they will process all events regardless of their lateness.