Search code examples
apache-flinkflink-streamingflink-cep

Is the description of the "Contiguity within looping patterns " in the document correct?


as the description in flink CEP document:

  1. Strict Contiguity: Expects all matching events to appear strictly one after the other, without any non-matching events in-between.
  2. Relaxed Contiguity: Ignores non-matching events appearing in-between the matching ones.
  3. Non-Deterministic Relaxed Contiguity: Further relaxes contiguity, allowing additional matches that ignore some matching events.

first example is easy to understand:

given the pattern : "a b"

and the input "a", "c", "b1", "b2"

  1. Strict Contiguity output : {} (no match)
  2. Relaxed Contiguity output : {a b1}
  3. Non-Deterministic Relaxed Contiguity output : {a b1}, {a b2}

but example of Contiguity within looping patterns is really hard to understand:

given the pattern : "a b+ c".

and the input : "a", "b1", "d1", "b2", "d2", "b3" "c"

  1. Strict Contiguity: {a b3 c}
  2. Relaxed Contiguity: {a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c}
  3. Non-Deterministic Relaxed Contiguity: {a b1 c}, {a b1 b2 c}, {a b1 b3 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c}

the Strict Contiguity output {a b3 c}, but this is against the description in Strict Contiguity, since there are many non-matching events between a and b3.


Solution

  • I believe you are right. With strict contiguity, it does not match at all. I wrote the following example to make sure:

    public class StreamingJob {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<String> events = env.fromElements("a", "b1", "d1", "b2", "d2", "b3", "c");
    
            AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
            Pattern<String, String> pattern =
                    Pattern.<String>begin("a", skipStrategy)
                            .where(
                                    new SimpleCondition<String>() {
                                        @Override
                                        public boolean filter(String element) throws Exception {
                                            return element.startsWith("a");
                                        }
                                    })
                            .next("b+")
                            .where(
                                    new SimpleCondition<String>() {
                                        @Override
                                        public boolean filter(String element) throws Exception {
                                            return element.startsWith("b");
                                        }
                                    })
                            .oneOrMore().consecutive()
                            .next("c")
                            .where(
                                    new SimpleCondition<String>() {
                                        @Override
                                        public boolean filter(String element) throws Exception {
                                            return element.startsWith("c");
                                        }
                                    });
    
            PatternStream<String> patternStream = CEP.pattern(events, pattern).inProcessingTime();
            patternStream.select(new SelectSegment()).addSink(new PrintSinkFunction<>(true));
            env.execute();
        }
    
        public static class SelectSegment implements PatternSelectFunction<String, String> {
            public String select(Map<String, List<String>> pattern) {
                return String.join("", pattern.get("a"))
                        + String.join("", pattern.get("b+"))
                        + String.join("", pattern.get("c"));
            }
        }
    }
    

    I've created FLINK-27456 to track this.