I have a simple Flink application, trying to detect a pattern over an event stream which is created from the text file below:
1,A
2,B
3,C
4,A
5,C
6,B
7,D
8,D
9,A
10,D
I define the pattern this way:
Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
}).next("middle")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("B");
}
}).followedBy("end")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
});
and performed the pattern detection using patternStream.process this way:
DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
@Override
public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent( endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
}
});
and using patternStream.flatSelect this way:
DataStream<DataEvent> result = patternStream.flatSelect(
new PatternFlatSelectFunction<DataEvent, DataEvent>() {
@Override
public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent(
endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
));
}
}
);
But the "result" event stream only contains the first matched pattern, not all of them. In both cases the output file contains only this line:
4:A-B-A(1-2-4)
I used oneOrMore() at the end of pattern definition, but the result is:
4:A-B-A(1-2-4)
4:A-B-A(1-2-4)
I expect the process or select function to select all possible combinations of (A-B-followedby-A) which are:
4:A-B-A(1-2-4)
4:A-B-A(1-2-9)
In addition, if I add another line in the input file before line 6 and add "6, A" this way:
1,A
2,B
3,C
4,A
5,C
6,A
7,B
8,D
9,D
10,A
11,D
the result is :
10:A-B-A(6-7-10)
4:A-B-A(1-2-4)
This means it will start the pattern matching from the scratch after finding the first match. How can I fix this issue?
My complete code is this:
package org.example;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.File;
import java.io.FileInputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class EventStreamCEP {
public static List<DataEvent> originalStream = new ArrayList<>();
public static List<DataEvent> complexEvents = new ArrayList<>();
public static void main(String[] args) throws Exception {
// Set up the Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Define the input data format
TextInputFormat inputFormat = new TextInputFormat(new Path("/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt"));
// read the input data from a file
DataStream<DataEvent> eventStream = env.readFile(inputFormat, "/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt")
.map(new MapFunction<String, DataEvent>() {
@Override
public DataEvent map(String value) throws Exception {
// Parse the line into an event object
String[] fields = value.split(",");
long timestamp = Integer.parseInt(fields[0]);
String type = fields[1];
DataEvent event = new DataEvent(timestamp,type);
//event.setTimestamp(timestamp);
return event;
}
})
// Assign timestamps and watermarks
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<DataEvent>() {
private long currentMaxTimestamp;
private final long maxOutOfOrderness = 10000; // 10 seconds
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(DataEvent element, long previousElementTimestamp) {
long timestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
return timestamp;
}
});
// Define a pattern to detect events in the stream
Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
}).next("middle")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("B");
}
}).followedBy("end")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
});
//pattern.oneOrMore();
// Create a pattern stream using the defined pattern
PatternStream<DataEvent> patternStream = CEP.pattern(eventStream, pattern);
/*
DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
@Override
public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent( endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
}
});
*/
// Use PatternFlatSelectFunction to get all matched patterns
DataStream<DataEvent> result = patternStream.flatSelect(
new PatternFlatSelectFunction<DataEvent, DataEvent>() {
@Override
public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent(
endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
));
}
}
);
// print the windowed event stream
result.print();
// write the matched patterns to a text file
String outputPath = "/home/majidlotfian/flink/flink-quickstart/PLprivacy/output_folder/output.txt";
result.map(new MapFunction<DataEvent, String>() {
@Override
public String map(DataEvent value) throws Exception {
return value.getTimestamp()+":"+value.getType();
}
})
.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
.setParallelism(1); // ensure that events are written in order
env.execute("EventStreamCEP");
}
In order for 4:A-B-A(1-2-9)
to match, I believe you need to use followedByAny
rather than followedBy
. Followed-by-any relaxes the contiguity requirement, and allows matches that ignore matching events.
This is very dangerous. The internal state machine won't be able to stop looking for further matches, and won't free up the partial match. You'll need to find some way to constrain the matching engine, e.g., by specifying a within
clause, or by adding a more constrained terminal node to the pattern.