Search code examples
javaapache-flinkflink-cep

Why does Flink CEP not match the pattern?


Flink newbie here.

I have a local setup where Flink processes JSON events generated by Debezium. The idea is to monitor conversations happening between a customer support agent and a customer. If the customer support agent does not reply to a message within a certain time frame, I'd like to raise an alert.

My Flink CEP code looks as follows:

    // We create a watermark strategy
    WatermarkStrategy<MessageEvent> watermarkStrategy = WatermarkStrategy
            .<MessageEvent>forMonotonousTimestamps()
            .withTimestampAssigner((event, ts) -> event.getPayload().getAfter().getCreatedAt().longValue());

    // We get our inputs here
    KafkaSource<String> msgTopic = Sources.kafkaSource("message", "flink");
    DataStream<String> input = env.fromSource(msgTopic, WatermarkStrategy.noWatermarks(), "Message Source");

    // Convert the Debezium payload into MessageEvent and assign watermarks
    // Partition by thread ID
    DataStream<MessageEvent> msgStream = input.map(new MessageMapper())
            .assignTimestampsAndWatermarks(watermarkStrategy)
            .keyBy(msg -> msg.getPayload().getAfter().getThreadId());

    Pattern<MessageEvent, ?> pattern = Pattern.<MessageEvent>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
                    .where(new MessageEventDirectionFilter("INCOMING"))
                    .oneOrMore()
                    .greedy()
                    .notNext("end")
                    .where(new MessageEventDirectionFilter("OUTGOING"))
                    .within(Time.seconds(10));

    CEP.pattern(msgStream, pattern)
            .inEventTime()
            .process(new IdleConversationProcessFunction())
            .print();

INCOMING messages are those which are sent by the customer where as OUTGOING messages are the ones sent by the support agent. This is what I am trying to capture. I can see records being sent to the STDOUT operator but nothing shows up in the task manager logs.

enter image description here

Why does the pattern defined not match?


Solution

  • The PrintSink does not write to the task manager logs. Instead it writes to separate files (with names ending in .out) also located in the log directory. Could it be that the pattern matched, but you overlooked the output?

    BTW, you should ignore the fact that the web UI shows 0 records sent from the print sink to STDOUT. The metrics showing records (and bytes) sent (and received) only include traffic flowing through Flink's internal network stack. Connections to external systems (sources and sinks) are not included in these metrics.

    However, I doubt notNext supports being used in this fashion, so I won't be surprised if there truly is no output.

    What will work is if you define the pattern to match the case where the reply does occur in time, and then capture the cases you really want (i.e., when it times out) with the processTimedOutMatch method.

    See Handling Timed Out Partial Patterns for more details.