Search code examples
apache-flinkflink-cep

Why the time between ingestion time of last matched event and time of CEP match and fire the pattern are so long?


I write a very sample CEP Pattern, which just match three sequence latters 'abc',however even for so such simple pattern , it took Flink(test both for 1.5 and 1.4.2) almost 1 seconds(sometimes around 0.5 second) between the ingestion time of last matched event and time of CEP match the pattern and fires.

The following is the test result, please node the ;last two fields ingestionTimestamp and timestamp are ingestion time of last match event and the timestamp the CEP fired the match pattern.

My question is how to improve the performance? Is there anything like setBufferTimeout for CEP can be set? I tried to setBufferTimeout to 5ms, but no luck.

Test result:

3> Transport{prodId=411, from='a', to='b', ingestionTimestamp='1528384356501', timestamp='1528384357034'} Transport{prodId=411, from='b', to='c', ingestionTimestamp='1528384356502', timestamp='1528384357034'} Transport{prodId=411, from='c', to='d', ingestionTimestamp='1528384356505', timestamp='1528384357034'} 
3> Transport{prodId=415, from='a', to='b', ingestionTimestamp='1528384356530', timestamp='1528384357034'} Transport{prodId=415, from='b', to='c', ingestionTimestamp='1528384356532', timestamp='1528384357034'} Transport{prodId=415, from='c', to='d', ingestionTimestamp='1528384356534', timestamp='1528384357034'} 
3> Transport{prodId=419, from='a', to='b', ingestionTimestamp='1528384356549', timestamp='1528384357034'} Transport{prodId=419, from='b', to='c', ingestionTimestamp='1528384356549', timestamp='1528384357034'} Transport{prodId=419, from='c', to='d', ingestionTimestamp='1528384356554', timestamp='1528384357034'}    

The following is the code:

public class RetailerExampleKafka {

private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
// private static final String RIDE_SPEED_GROUP = "rideSpeedGroup";
/**
 * The initial source of our shipment.
 */
private static final String SRC = "a";

private static final Pattern<Transport, ?> pattern = Pattern.<Transport>begin("start")
        .where(new SimpleCondition<Transport>() {
            private static final long serialVersionUID = 314415972814127035L;

            @Override
            public boolean filter(Transport value) throws Exception {
                return Objects.equals(value.getFrom(), SRC);
            }
        }).next("middle").where(new SimpleCondition<Transport>() {
            private static final long serialVersionUID = 6664468385615273240L;

            @Override
            public boolean filter(Transport value) {
                return value.getFrom().startsWith("b");
            }
        }).next("end").where(new SimpleCondition<Transport>() {
            private static final long serialVersionUID = 5721311694340771858L;

            @Override
            public boolean filter(Transport value) {
                return value.getFrom().startsWith("c");
            }
        }).within(Time.milliseconds(5000));


public static void main(String[] args) throws Exception {

    //List<Transport> sampleData = new ArrayList<>();
    //sampleData.add(new Transport(1, "a", "b", 0L));
    //sampleData.add(new Transport(1, "b", "c", 1L));
    //sampleData.add(new Transport(1, "c", "d", 2L));

    // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    env.setBufferTimeout(5);
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.getConfig().disableSysoutLogging();
   ParameterTool params = ParameterTool.fromArgs(args);
    String inputTopic = params.getRequired("input-topic");
    String groupID = params.getRequired("group-id");
    Long slide = Long.parseLong(params.getRequired("slide").trim());


    final int popThreshold = 1; // threshold for popular places
    env.getConfig().setAutoWatermarkInterval(1000);
    Properties kafkaProps = new Properties();
    //kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
    kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
    kafkaProps.setProperty("group.id", "g111");
    kafkaProps.setProperty("auto.offset.reset", "earliest");
    // create a Kafka consumer
    FlinkKafkaConsumer011<Transport> consumer = new FlinkKafkaConsumer011<>(
            inputTopic,
            new TransportSchema(),
            kafkaProps);

    DataStream<Transport> rides = env.addSource(consumer)
            .keyBy(element -> element.getProductId())
            .process(new MatchFunction2());

    CEP.pattern(rides, pattern).flatSelect(new PatternFlatSelectFunction<Transport, String>() {
        private static final long serialVersionUID = -8972838879934875538L;

        @Override
        public void flatSelect(Map<String, List<Transport>> map, Collector<String> collector) throws Exception {
            StringBuilder str = new StringBuilder();
            for (Map.Entry<String, List<Transport>> entry : map.entrySet()) {
                for (Transport t : entry.getValue()) {
                    t.timestamp = System.currentTimeMillis();
                    str.append(t + " ");
                }
            }
            collector.collect(str.toString());
        }
    }).print();
    env.execute();
}

/**
 * Our input records. Each contains:
 * 1. the id of the product,
 * 2. the starting location of the shipment, and
 * 3. the final location of the shipment.
 */
public static class Transport {
    private final int prodId;
    private final String from;
    private final String to;
    private long timestamp;
    public long ingestionTimestamp;

    public Transport(int productId, String from, String to, long timestamp) {
        this.prodId = productId;
        this.from = from;
        this.to = to;
        this.timestamp = timestamp;
    }

    public int getProductId() {
        return prodId;
    }

    public String getFrom() {
        return from;
    }

    public String getTo() {
        return to;
    }

    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public String toString() {
        return "Transport{" +
                "prodId=" + prodId +
                ", from='" + from + '\'' +
                ", to='" + to + '\'' +
                ", ingestionTimestamp='" + ingestionTimestamp + '\'' +
                ", timestamp='" + timestamp + '\'' +
                '}';
    }

    public static Transport fromString(String line) {
        String[] split = line.split(",");
        Transport transport = new Transport(Integer.valueOf(split[0]), split[1], split[2], Long.valueOf(split[3]));
        return transport;
    }
}

private static Tuple2<String, Integer> getLastDestinationAndStopCountForPattern(IterativeCondition.Context<Transport> ctx, String patternName) {
    return getLastDestinationAndStopCountForPattern(ctx.getEventsForPattern(patternName));
}

private static Tuple2<String, Integer> getLastDestinationAndStopCountForPattern(Iterable<Transport> events) {
    Tuple2<String, Integer> locationAndStopCount = new Tuple2<>("", 0);

    for (Transport transport : events) {
        locationAndStopCount.f0 = transport.getTo();
        locationAndStopCount.f1++;
    }
    return locationAndStopCount;
}


public static class MatchFunction2 extends ProcessFunction<Transport, Transport> {
    @Override
    public void open(Configuration config) {
    }

    @Override
    public void processElement(Transport ride, Context context, Collector<Transport> out) throws Exception {
        ride.ingestionTimestamp = context.timestamp();
        out.collect(ride);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<Transport> out) throws Exception {
    }
}

}


Solution

  • It is because of the IngestionTime which works like EventTime with automatically generated timestamps and watermarks. Timestamps are assigned from system time, and watermarks are generated every watermarkInterval.

    In your case you set .setAutoWatermarkInterval(1000); therefore Watermark is generated every second. Because in case of CEP library order of events is crucial, they are sorted on Watermark arrival and only then processed. Hence the 1 second difference.