as the description in flink CEP document:
Strict Contiguity:
Expects all matching events to appear strictly one after the other, without any non-matching events in-between.Relaxed Contiguity
: Ignores non-matching events appearing in-between the matching ones.Non-Deterministic Relaxed Contiguity
: Further relaxes contiguity, allowing additional matches that ignore some matching events.first example is easy to understand:
given the pattern : "a b"
and the input "a", "c", "b1", "b2"
{} (no match)
{a b1}
{a b1}, {a b2}
but example of Contiguity within looping patterns
is really hard to understand:
given the pattern : "a b+ c"
.
and the input : "a", "b1", "d1", "b2", "d2", "b3" "c"
the Strict Contiguity output {a b3 c}
, but this is against the description in Strict Contiguity
, since there are many non-matching events
between a
and b3
.
I believe you are right. With strict contiguity, it does not match at all. I wrote the following example to make sure:
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> events = env.fromElements("a", "b1", "d1", "b2", "d2", "b3", "c");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<String, String> pattern =
Pattern.<String>begin("a", skipStrategy)
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("a");
}
})
.next("b+")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("b");
}
})
.oneOrMore().consecutive()
.next("c")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("c");
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern).inProcessingTime();
patternStream.select(new SelectSegment()).addSink(new PrintSinkFunction<>(true));
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("a"))
+ String.join("", pattern.get("b+"))
+ String.join("", pattern.get("c"));
}
}
}
I've created FLINK-27456 to track this.