I'm trying to create my custom metric variable according to this tutorial
With the sample code it's provided, I can get the events and the Histogram.
I'm confused how the identifier been used by prometheus & grafana. I also trying to modify the sample code little bit but the metric just no longer work.
Also, I'm only able to access the system metric but not my own.
My question is:
thanks in advance.
Here is the map function
class FlinkMetricsExposingMapFunction extends RichMapFunction<SensorReading, SensorReading> {
private static final long serialVersionUID = 1L;
private transient Counter eventCounter;
private transient Counter customCounter1;
private transient Counter customCounter2;
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext()
.getMetricGroup().counter("events");
customCounter1 = getRuntimeContext()
.getMetricGroup()
.addGroup("customCounterKey", "mod2")
.counter("counter1");
customCounter2 = getRuntimeContext()
.getMetricGroup()
.addGroup("customCounterKey", "mod5")
.counter("counter2");
// meter = getRuntimeContext().getMetricGroup().meter("eventMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public SensorReading map(SensorReading value) {
eventCounter.inc();
if (value.getCurrTimestamp() % 2 == 0)
customCounter1.inc();
if (value.getCurrTimestamp() % 5 == 0)
customCounter2.inc();
if (value.getCurrTimestamp() % 2 == 0 && value.getCurrTimestamp() % 5 == 0)
customCounter1.dec();
return value;
}
}
Example Job:
env
.addSource(new SimpleSensorReadingGenerator())
.name(SimpleSensorReadingGenerator.class.getSimpleName())
.map(new FlinkMetricsExposingMapFunction())
.name(FlinkMetricsExposingMapFunction.class.getSimpleName())
.print()
.name(DataStreamSink.class.getSimpleName());
Screenshot for access flink metrics from grafana:
flink-config.yaml
FROM flink:1.9.0
RUN echo "metrics.reporters: prom" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.latency.interval: 1000" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
mv $FLINK_HOME/opt/flink-metrics-prometheus-*.jar $FLINK_HOME/lib
COPY --from=builder /home/gradle/build/libs/*.jar $FLINK_HOME/lib/
default map function from tutorial:
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext().getMetricGroup().counter("events");
valueHistogram = getRuntimeContext()
.getMetricGroup()
.histogram("value_histogram", new DescriptiveStatisticsHistogram(10_000_000));
}
The counter you created is accessible by <system-scope>. customCounterKey.mod2.counter1
. <system-scope>
is defined in your flink-conf.yaml. If you did not defined it there the default is <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
.
A metric group bascially defines a hierarchy of metric names. According to the documentation the metric-group is a named container for metrics. It consist of 3 parts (scopes): The system-scope (defined in flink-conf.yaml), a user scope(whatever you define in addGroup()
) and a metric name.
That depends on what you want to measure. For everything which you could detected for counters, gauges or meters I would go for the metrics. If it comes to histograms you should have a closer look on what you get from flink if you use the prometheus reporter. Flink generalizes all different metric frameworks - the way histogramms are implemented in prometheus is different than in e.g. graphite. The definition of buckets is given by flink and can't be changed as far as I know (despite some relection magic).
All this is described in more detail here: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#registering-metrics
Hope that helps.