Search code examples
flink-streamingflink-cep

Why AssignerWithPunctuatedWatermarks does not work in my data stream?


I've some problem.
I use .assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks(60000)) in middle a programm (after some filters, map and other apache flink operators).
I've simple Pattern:

begin("start", AfterMatchSkipStrategy.skipToLast("end"))
  .where(new SimpleConditionA())
  .followedBy("end")
  .where(new SimpleConditionB())
  .within(Times.minutes(5));

I read a messages from Apache kafka (below are the event timestamps)
Message A: A.timestamp = 11:50:00
Message B: B.timestamp = 11:51:00

public class MyAssignerWithPunctuatedWatermark implements AssignerWithPunctuatedWatermark{

  private long maxOutOfOrderness;
  private long currentMaxTimestamp;
  /*there are a Constructor*/

  public long extractTimestamp (Event e, long l){
    long timestamp = e.timestamp;
    if(timestamp > currentMaxTimestamp){
      currentMaxTimestamp = timestamp;
    }
    return timestamp;
  }

  public Watermark checkAndGetNextWatermark(Event e, long l){
    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  }
}

Then I put in Apache Kafka a Message A - it was OK!
Then I put in Apache Kafka a Message B - and pattern was not assemled. (I use a logging journal in SimpleCondtionB, Message B does not go in SimpleCondtion). WHY????
When I used .assignTimestampsAndWatermarks(new IngestionTimeExtractor()). It was all ok. But I need to take a late (be late no more than 1 minutes) events.


Solution

  • It is necessary that assignTimestampsAndWatermarks is done before any operations that depend on timing. CEP depends on the timing information provided by assignTimestampsAndWatermarks in order to sort the input stream, and to handle the within(duration) constraint.

    In order to correctly match a Pattern against a possibly out-of-order event time stream, CEP must first sort the incoming events by their timestamps. As part of this, each incoming event is held in a buffer until the current watermark has passed its timestamp -- because up to that point, an earlier event might still arrive (without being considered late). Late events, with timestamps before the current watermark, are either dropped by CEP, or sent to a side output (if one is configured).

    So, in order for Message B to be processed by your Pattern, you must first inject an event whose timestamp is enough later than 11:51:00 (the time of Message B) to satisfy the maxOutOfOrderness you've used in your watermark generator.

    With an IngestionTimeExtractor the events can not be out-of-order, so this sorting step isn't needed, and Message B can be processed without delay.