Search code examples
javaapache-flinkflink-streamingflink-cep

Apache Flink CEP how to pass in time window based on event value?


Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
        }
    }).followedBy("end").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");
        }
    }).within(Time.seconds(10));

Is there a way I can replace Time.seconds(10) with value.getSomeTimeField() that I pass in through the Event?


Solution

  • I guess you want to work in event time manner. For more on it you can check out this docs and this section on how to extract timestamp from element.

    In your example you can do sth like:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    DataStream<Event> input = ...
    
    input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
    
        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return value.getSomeTimeField();
        }
    })
    
    CEP.pattern(input, pattern).select(...)
    

    This way events will be automatically sorted in the stream and the timeout will be applied in both cases in regards to the time field.