I have implemented a pattern with Flink CEP that matches three Events such as A->B->C
. After I have defined my pattern I generate a
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
with a PatternSelectFunction
such that
patternStream.select(new MyPatternSelectFunction()).print();
This works like a charm but I am interested in the event-time of all matched events. I know that the traditional Flink streaming API offers rich functions which allow you to register Flink's internal latency tracker as described in this question. I have also seen that for Flink 1.8 a new RichPatternSelectFunction
has been added. But unfortunately I cannot set up Flink 1.8 with Flink CEP.
Finally, is there a way to get the event-time of all matched events?
You don't need Rich Functions to use Flink's latency tracking. You just need to enable it by setting latencyTrackingInterval
to a positive number in either the Flink configuration or ExecutionConfig, e.g.,
env.getConfig().setLatencyTrackingInterval(1000);
and then you can observe the results in your metrics solution, or via the REST api (latency metrics are not reported in the Flink web UI).
Update:
The latency statistics are job metrics, and are in the list returned by
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics
Latency metric values can be fetched from
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>
These metrics have names like
latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>
where the IDs identity the source and operator nodes in the job graph between which the latency is being measured.
For example, I can determine the 95th percentile latency between the source and one of the sinks in a job I am running right now with this request:
http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95
Alternatively, you could use a ProcessFunction to add processing time timestamps to your events before they enter the CEP part of your job, and then use another ProcessFunction afterwards to measure the elapsed time.