Search code examples
apache-flink

What Metrics does a Flink MetricReporter get?


I have several Apache Flink (batch, not streaming) workflows and my goal is to get some custom metrics from Flink into Cloudwatch. I'm pretty sure the process is going to look like this:

  1. In my Flink code, create some metrics objects e.g. getRuntimeContext().getMetricGroup().getCounter("numInputElements");

  2. Implement a custom MetricReporter and register it so that Flink sends my Counter from step 1 to the Reporter

  3. Aggregate the metrics in MetricReporter: all metrics called "numInputElements" should be summed so that in the end, the MetricReporter only has to put one "numInputElements" metric in Cloudwatch.

  4. MetricReporter, when it report()s, puts the "numInputElements" in Cloudwatch.

My questions relate to step 3:

  1. How many MetricReporter instances will I have? If it's more than 1 per Flink workflow, then I will need to aggregate metrics in Cloudwatch as well as in my MetricReporter logic. If it's one per slice running my workflow, then I only need to aggregate in Cloudwatch, because I will have n slices each creating one "numInputElements" metric and n MetricsReporters each getting one "numInputElements" to send to Cloudwatch.

  2. If I have more than 1 MetricReporter instances, what determines which metrics get sent to each MetricReporter?


Solution

  • The job and task managers each have their own metrics reporter instances. And each parallel slice of your user functions will have its own metrics, with individual, per sub-task names. Typically metrics aggregation is done external to Flink, with queries that aggregate over the metric names (which you can control via their scope formats).

    The metrics reporters are fairly simple -- take a look at some in the sources.