Search code examples
pattern-matchingapache-flinkflink-cep

My flink pattern detection program only return the first match, not all matched patterns


I have a simple Flink application, trying to detect a pattern over an event stream which is created from the text file below:

1,A
2,B
3,C
4,A
5,C
6,B
7,D
8,D
9,A
10,D

I define the pattern this way:

        Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }
                }).next("middle")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("B");
                    }

                }).followedBy("end")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }

                });

and performed the pattern detection using patternStream.process this way:

DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {

            @Override
            public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {


                DataEvent startEvent = map.get("start").get(0);
                DataEvent middleEvent = map.get("middle").get(0);
                DataEvent endEvent = map.get("end").get(0);
                collector.collect(new DataEvent( endEvent.getTimestamp(),
                        startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
            }
        });

and using patternStream.flatSelect this way:

DataStream<DataEvent> result = patternStream.flatSelect(
                new PatternFlatSelectFunction<DataEvent, DataEvent>() {
                    @Override
                    public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
                        DataEvent startEvent = map.get("start").get(0);
                        DataEvent middleEvent = map.get("middle").get(0);
                        DataEvent endEvent = map.get("end").get(0);
                        collector.collect(new DataEvent(
                                endEvent.getTimestamp(),
                                startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
                        ));
                    }
                }
        );

But the "result" event stream only contains the first matched pattern, not all of them. In both cases the output file contains only this line:

4:A-B-A(1-2-4)

I used oneOrMore() at the end of pattern definition, but the result is:

4:A-B-A(1-2-4)
4:A-B-A(1-2-4)

I expect the process or select function to select all possible combinations of (A-B-followedby-A) which are:

4:A-B-A(1-2-4)
4:A-B-A(1-2-9)

In addition, if I add another line in the input file before line 6 and add "6, A" this way:

1,A
2,B
3,C
4,A
5,C
6,A
7,B
8,D
9,D
10,A
11,D

the result is :

10:A-B-A(6-7-10)
4:A-B-A(1-2-4)

This means it will start the pattern matching from the scratch after finding the first match. How can I fix this issue?

My complete code is this:

package org.example;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;


import org.apache.flink.streaming.api.windowing.time.Time;

import java.io.File;
import java.io.FileInputStream;
import java.text.SimpleDateFormat;
import java.util.*;

import org.apache.flink.cep.CEP;

import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;

import org.apache.flink.streaming.api.windowing.time.Time;

import java.io.FileOutputStream;
import java.io.IOException;



import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class EventStreamCEP {

    public static List<DataEvent> originalStream = new ArrayList<>();
    public static List<DataEvent> complexEvents = new ArrayList<>();


    public static void main(String[] args) throws Exception {


        


        // Set up the Flink execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Define the input data format
        TextInputFormat inputFormat = new TextInputFormat(new Path("/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt"));

// read the input data from a file
        DataStream<DataEvent> eventStream = env.readFile(inputFormat, "/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt")
                .map(new MapFunction<String, DataEvent>() {
                    @Override
                    public DataEvent map(String value) throws Exception {
                        // Parse the line into an event object
                        String[] fields = value.split(",");
                        long timestamp = Integer.parseInt(fields[0]);
                        String type = fields[1];
                        DataEvent event = new DataEvent(timestamp,type);
                        //event.setTimestamp(timestamp);
                        return event;
                    }
                })

                // Assign timestamps and watermarks
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<DataEvent>() {
                    private long currentMaxTimestamp;
                    private final long maxOutOfOrderness = 10000; // 10 seconds

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    }

                    @Override
                    public long extractTimestamp(DataEvent element, long previousElementTimestamp) {
                        long timestamp = element.getTimestamp();
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
                        return timestamp;
                    }
                });



        // Define a pattern to detect events in the stream
        Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }
                }).next("middle")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("B");
                    }

                }).followedBy("end")
                .where(new SimpleCondition<DataEvent>() {
                    @Override
                    public boolean filter(DataEvent dataEvent) throws Exception {
                        return dataEvent.getType().equals("A");
                    }

                });
        //pattern.oneOrMore();





        // Create a pattern stream using the defined pattern
        PatternStream<DataEvent> patternStream = CEP.pattern(eventStream, pattern);


        /*
        DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {

            @Override
            public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {


                DataEvent startEvent = map.get("start").get(0);
                DataEvent middleEvent = map.get("middle").get(0);
                DataEvent endEvent = map.get("end").get(0);
                collector.collect(new DataEvent( endEvent.getTimestamp(),
                        startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
            }
        });

         */


        // Use PatternFlatSelectFunction to get all matched patterns
        DataStream<DataEvent> result = patternStream.flatSelect(
                new PatternFlatSelectFunction<DataEvent, DataEvent>() {
                    @Override
                    public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
                        DataEvent startEvent = map.get("start").get(0);
                        DataEvent middleEvent = map.get("middle").get(0);
                        DataEvent endEvent = map.get("end").get(0);
                        collector.collect(new DataEvent(
                                endEvent.getTimestamp(),
                                startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
                        ));
                    }
                }
        );





// print the windowed event stream

        result.print();

        // write the matched patterns to a text file
        String outputPath = "/home/majidlotfian/flink/flink-quickstart/PLprivacy/output_folder/output.txt";
        result.map(new MapFunction<DataEvent, String>() {
                    @Override
                    public String map(DataEvent value) throws Exception {
                        return value.getTimestamp()+":"+value.getType();
                    }
                })
                .writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
                .setParallelism(1);  // ensure that events are written in order

        

        env.execute("EventStreamCEP");


}


Solution

  • In order for 4:A-B-A(1-2-9) to match, I believe you need to use followedByAny rather than followedBy. Followed-by-any relaxes the contiguity requirement, and allows matches that ignore matching events.

    This is very dangerous. The internal state machine won't be able to stop looking for further matches, and won't free up the partial match. You'll need to find some way to constrain the matching engine, e.g., by specifying a within clause, or by adding a more constrained terminal node to the pattern.