Search code examples
javaapache-flinkflink-streamingflink-cep

Flink CEP Pattern does not match for first events after starting job and always matches previous events set


I want to match a CEP Pattern in Flink 1.4.0 Streaming with the following code:

    DataStream<Event> input = inputFromSocket.map(new IncomingMessageProcessor()).filter(new FilterEmptyAndInvalidEvents());

    DataStream<Event> inputFiltered = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
    KeyedStream<Event, String> partitionedInput = inputFiltered.keyBy(new MyKeySelector());

    Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(new ActionCondition("action1"))
    .followedBy("middle").where(new ActionCondition("action2"))
    .followedBy("end").where(new ActionCondition("action3"));

    pattern = pattern.within(Time.seconds(30));

    PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

Event is just a POJO

public class Event {
    private UUID id;
    private String action;
    private String senderID;
    private long occurrenceTimeStamp;
    ......
}

that gets extracted from my custom source (Google PubSub). The first filter FilterEmptyAndInvalidEvents() just filters for events that have incorrect formatting etc. but this does not occur in this case. I can verify this because of the logging output. So every event runs through the MyKeySelector.getKey() method.

The BoundedOutOfOrdneressGenerator extracts just the timestamp from one field:

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
    private static Logger LOG = LoggerFactory.getLogger(BoundedOutOfOrdernessGenerator.class);
    private final long maxOutOfOrderness = 5500; // 5.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(Event element, long previousElementTimestamp) {
        long timestamp = element.getOccurrenceTimeStamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        Watermark newWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        return newWatermark;
    }
}

MyKeySelector just extracts a string value out of the field:

public class MyKeySelector implements KeySelector<Event, String> {
    private static Logger LOG = LoggerFactory.getLogger(MyKeySelector.class);

    @Override
    public String getKey(Event value) throws Exception {
        String senderID = value.getSenderID();
        LOG.info("Partioning event {} by key {}", value, senderID);
        return senderID;
    }
}

ActionCondition is here just doing a comparison of one field in the events and looks like this:

public class ActionCondition extends SimpleCondition<Event> {
    private static Logger LOG = LoggerFactory.getLogger(ActionCondition.class);

    private String filterForCommand = "";

    public ActionCondition(String filterForCommand) {
        this.filterForCommand = filterForCommand;
    }

    @Override
    public boolean filter(Event value) throws Exception {
        LOG.info("Filtering event for {} action: {}", filterForCommand, value);

        if (value == null) {
            return false;
        }

        if (value.getAction() == null) {
            return false;
        }

        if (value.getAction().equals(filterForCommand)) {
            LOG.info("It's a hit for the {} action for event {}", filterForCommand, value);
            return true;
        } else {
            LOG.info("It's a miss for the {} action for event {}", filterForCommand, value);
            return false;
        }
    }
}

Unfortunately, when starting the job and sending in events that should be matched by the pattern, they are received and partitioned correctly but the CEP Pattern is not matched.

As an example, I send in the following events:

  1. action1
  2. action2
  3. action3

In the log output of the Flink job I see that the events are correctly running through the MyKeySelector.getKey() Method since I added logging output there. So the events seem to appear correctly in the stream, but unfortunately they are not matched by the pattern.

The logging output looks like this:

FilterEmptyAndInvalidEvents   - Letting event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 through
MyKeySelector  - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents   - Letting event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 through
MyKeySelector  - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents   - Letting event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 through
MyKeySelector  - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA

TimeCharacteristic is set to EventTime via

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

and the events contain a correct timestamp.

If I now send in another 3 events with the actions (but with new timestamp etc.)

  1. action1
  2. action2
  3. action3

the pattern is matched for the first set of events. I know that it is matched for the first set of events since I for debugging purposes tagged every event with a guid and I print that for the one's matched.

When sending in a 3rd, 4th, ... set of these 3 events in, always the previous set of events get matched. So there seems to be kind of an "offset" in the pattern detection. It does not seem to be a timing issue though, since the first set of events is also not matched if I wait long after sending it (and seeing the events being partitioned by Flink).

Is there anything wrong with my code or why does flink only always match the previous set of events with the pattern?


Solution

  • I did sort it out - I was always searching at the point of streaming source but my event handling is actually totally fine. The problem was, that my Watermark generation did not happen continuously. As you can see in the code above, I did only generate a watermark when an event was received.

    But after sending in the first 3 events, there were no more events following in my setup. Therefore, no new Watermark was ever generated again.

    And because no new Watermark with a Timestamp greater than the timestamp of the last received event of the sequence was ever created, Flink did never process the elements. The reason for this can be found here: Flink CEP - Handling Lateness in Event Time

    The important sentence is:

    ...and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed.

    So since I was generating a Watermark in BoundedOutOfOrdernessGenerator with a 5.5 second delay, the latest Watermark was always 5.5 seconds before the timestamp of the last event. Therefore, the events were never processed.

    So one solution to this is to periodically generate Watermarks that assume a specific lateness for events coming in. In order to do this, we need to set setAutoWatermarkInterval for the ExecutionConfig:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ..
    ExecutionConfig executionConfig = env.getConfig();
    executionConfig.setAutoWatermarkInterval(1000L);
    

    This enables Flink to periodically call the Watermark generator in the given time (in this case each second) and pull for a new Watermark.

    Furthermore, we need to adjust the Timestamp/Watermark generator so that it emits new Timestamps even without new Events flowing in. For this, I manipulated the BoundedOutOfOrdernessTimestampExtractor.java that ships with Flink:

    public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
    
        private static final long serialVersionUID = 1L;
    
        /** The current maximum timestamp seen so far. */
        private long currentMaxTimestamp;
    
        /** The timestamp of the last emitted watermark. */
        private long lastEmittedWatermark = Long.MIN_VALUE;
    
        /**
         * The (fixed) interval between the maximum seen timestamp seen in the records
         * and that of the watermark to be emitted.
         */
        private final long maxOutOfOrderness;
    
        public BoundedOutOfOrdernessGenerator() {
            Time maxOutOfOrderness = Time.seconds(5);
    
            if (maxOutOfOrderness.toMilliseconds() < 0) {
                throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness
                        + ". This parameter cannot be negative.");
            }
            this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
            this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
        }
    
        public long getMaxOutOfOrdernessInMillis() {
            return maxOutOfOrderness;
        }
    
        /**
         * Extracts the timestamp from the given element.
         *
         * @param element The element that the timestamp is extracted from.
         * @return The new timestamp.
         */
        public long extractTimestamp(Event element) {
            long timestamp = element.getOccurrenceTimeStamp();
            return timestamp;
        }
    
        @Override
        public final Watermark getCurrentWatermark() {
            Instant instant = Instant.now();
            long nowTimestampMillis = instant.toEpochMilli();
            long latenessTimestamp = nowTimestampMillis - maxOutOfOrderness;
    
            if (latenessTimestamp >= currentMaxTimestamp) {
                currentMaxTimestamp = latenessTimestamp;
            }
    
            // this guarantees that the watermark never goes backwards.
            long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
            if (potentialWM >= lastEmittedWatermark) {
                lastEmittedWatermark = potentialWM;
            }
            return new Watermark(lastEmittedWatermark);
        }
    
        @Override
        public final long extractTimestamp(Event element, long previousElementTimestamp) {
            long timestamp = extractTimestamp(element);
            if (timestamp > currentMaxTimestamp) {
                currentMaxTimestamp = timestamp;
            }
            return timestamp;
        }
    }
    

    As you can see in getCurrentWatermark(), I take the current epoch timestamp, subtract the maximum lateness we expect and then create a watermark from this timestamp.

    Together, Flink now pulls for a new Timestamp every second and the Watermark always "lags" 5 seconds behind. This allows events to be matched against the defined patterns at maximum 5 seconds after the last event was received.

    If this works for your scenario depends on your scenario because this also means that events that are older than 5 seconds (5 seconds less than the Watermark) at the point in time they are received by Flink are discarded and not processed any more.