Search code examples
javaapache-flinkmetrics

How to export flink user custom metric to prometheus & grafana


I'm trying to create my custom metric variable according to this tutorial

With the sample code it's provided, I can get the events and the Histogram.

I'm confused how the identifier been used by prometheus & grafana. I also trying to modify the sample code little bit but the metric just no longer work.

Also, I'm only able to access the system metric but not my own.

My question is:

  1. how can I access the counter I created? for example counter1
  2. What is the metricGroup exactly?
  3. For example, I'd like to detect a pattern from an input stream, and it's more reasonable to do it in the metric or just output the result to a timeseries database like influxdb?

thanks in advance.

Here is the map function

class FlinkMetricsExposingMapFunction extends RichMapFunction<SensorReading, SensorReading> {
  private static final long serialVersionUID = 1L;

  private transient Counter eventCounter;
  private transient Counter customCounter1;
  private transient Counter customCounter2;

  @Override
  public void open(Configuration parameters) {
    eventCounter = getRuntimeContext()
        .getMetricGroup().counter("events");

    customCounter1 = getRuntimeContext()
        .getMetricGroup()
        .addGroup("customCounterKey", "mod2")
        .counter("counter1");
    customCounter2 = getRuntimeContext()
        .getMetricGroup()
        .addGroup("customCounterKey", "mod5")
        .counter("counter2");

 //    meter = getRuntimeContext().getMetricGroup().meter("eventMeter", new DropwizardMeterWrapper(dropwizardMeter));
  }

  @Override
  public SensorReading map(SensorReading value) {
    eventCounter.inc();
    if (value.getCurrTimestamp() % 2 == 0)
      customCounter1.inc();

    if (value.getCurrTimestamp() % 5 == 0)
      customCounter2.inc();

    if (value.getCurrTimestamp() % 2 == 0 && value.getCurrTimestamp() % 5 == 0)
      customCounter1.dec();
    return value;
  }
}

Example Job:

env
.addSource(new SimpleSensorReadingGenerator())
.name(SimpleSensorReadingGenerator.class.getSimpleName())
.map(new FlinkMetricsExposingMapFunction())
.name(FlinkMetricsExposingMapFunction.class.getSimpleName())
.print()
.name(DataStreamSink.class.getSimpleName());

Update

Screenshot for access flink metrics from grafana:

enter image description here

flink-config.yaml

FROM flink:1.9.0
RUN echo "metrics.reporters: prom" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
    echo "metrics.latency.interval: 1000" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
    echo "metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
    mv $FLINK_HOME/opt/flink-metrics-prometheus-*.jar $FLINK_HOME/lib
COPY --from=builder /home/gradle/build/libs/*.jar $FLINK_HOME/lib/

default map function from tutorial:

 @Override
 public void open(Configuration parameters) {
    eventCounter = getRuntimeContext().getMetricGroup().counter("events");
    valueHistogram = getRuntimeContext()
            .getMetricGroup()
            .histogram("value_histogram", new DescriptiveStatisticsHistogram(10_000_000));
 }


Solution

    1. The counter you created is accessible by <system-scope>. customCounterKey.mod2.counter1. <system-scope> is defined in your flink-conf.yaml. If you did not defined it there the default is <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>.

    2. A metric group bascially defines a hierarchy of metric names. According to the documentation the metric-group is a named container for metrics. It consist of 3 parts (scopes): The system-scope (defined in flink-conf.yaml), a user scope(whatever you define in addGroup()) and a metric name.

    3. That depends on what you want to measure. For everything which you could detected for counters, gauges or meters I would go for the metrics. If it comes to histograms you should have a closer look on what you get from flink if you use the prometheus reporter. Flink generalizes all different metric frameworks - the way histogramms are implemented in prometheus is different than in e.g. graphite. The definition of buckets is given by flink and can't be changed as far as I know (despite some relection magic).

    All this is described in more detail here: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#registering-metrics

    Hope that helps.