We are using Flink 1.8.0 and running it on EMR - Yarn and would like to measure the throughput.
public class AsyncClass extends RichAsyncFunction<String, String> {
private transient Counter counter;
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(20000, TimeUnit.MILLISECONDS, executorService);
}
@Override
public void asyncInvoke(String key, final ResultFuture<String> resultFuture) throws Exception {
resultFuture.complete(key);
this.meter.markEvent();
this.counter.inc();
}
}
Turns out the meter displays whole number values and the rate is measured in decimals. When my load was a constant 1 event per second, it was actually measured as 0.9xxx something and hence was showing only 0 events per second.