Search code examples
apache-flinkflink-streamingflink-cep

Flink CEP Event Not triggering


I have implement the CEP Pattern in Flink which is working as expected connecting to local Kafka broker. But when i connecting to cluster based cloud kafka setup, the Flink CEP is not triggering.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //saves checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

I am using AscendingTimestampExtractor,

consumer.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<ObjectNode>() {
      @Override
      public long extractAscendingTimestamp(ObjectNode objectNode) {
        long timestamp;
        Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
        timestamp = instant.toEpochMilli();
        return timestamp;
      }
    });

And also i am getting Warn Message that,

AscendingTimestampExtractor:140 - Timestamp monotony violated: 1594017872227 < 1594017873133

And Also i tried using AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks none of one is working

I have attached Flink console screenshot where Watermark is not assigning. Updated flink console screenshot

Could Anyone Help?


Solution

  • CEP must first sort the input stream(s), which it does based on the watermarking. So the problem could be with watermarking, but you haven't shown us enough to debug the cause. One common issue is having an idle source, which can prevent the watermarks from advancing.

    But there are other possible causes. To debug the situation, I suggest you look at some metrics, either in the Flink Web UI or in a metrics system if you have one connected. To begin, check if records are flowing, by looking at numRecordsIn, numRecordsOut, or numRecordsInPerSecond and numRecordsOutPerSecond at different stages of your pipeline.

    If there are events, then look at currentOutputWatermark throughout the different tasks of your job to see if event time is advancing.

    Update:

    It appears you may be calling assignTimestampsAndWatermarks on the Kafka consumer, which will result in per-partition watermarking. In that case, if you have an idle partition, that partition won't produce any watermarks, and that will hold back the overall watermark. Try calling assignTimestampsAndWatermarks on the DataStream produced by the source instead, to see if that fixes things. (Of course, without per-partition watermarking, you won't be able to use an AscendingTimestampExtractor, since the stream won't be in order.)