Search code examples
apache-flinkflink-cep

Measuring event-time latency with Flink CEP


I have implemented a pattern with Flink CEP that matches three Events such as A->B->C. After I have defined my pattern I generate a

PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);

with a PatternSelectFunction such that

patternStream.select(new MyPatternSelectFunction()).print();

This works like a charm but I am interested in the event-time of all matched events. I know that the traditional Flink streaming API offers rich functions which allow you to register Flink's internal latency tracker as described in this question. I have also seen that for Flink 1.8 a new RichPatternSelectFunction has been added. But unfortunately I cannot set up Flink 1.8 with Flink CEP.

Finally, is there a way to get the event-time of all matched events?


Solution

  • You don't need Rich Functions to use Flink's latency tracking. You just need to enable it by setting latencyTrackingInterval to a positive number in either the Flink configuration or ExecutionConfig, e.g.,

    env.getConfig().setLatencyTrackingInterval(1000);
    

    and then you can observe the results in your metrics solution, or via the REST api (latency metrics are not reported in the Flink web UI).

    Documentation

    Update:

    The latency statistics are job metrics, and are in the list returned by

    http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics
    

    Latency metric values can be fetched from

    http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>
    

    These metrics have names like

    latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>
    

    where the IDs identity the source and operator nodes in the job graph between which the latency is being measured.

    For example, I can determine the 95th percentile latency between the source and one of the sinks in a job I am running right now with this request:

    http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95
    

    Alternatively, you could use a ProcessFunction to add processing time timestamps to your events before they enter the CEP part of your job, and then use another ProcessFunction afterwards to measure the elapsed time.