Search code examples
javaapache-flinkflink-streamingmetricsflink-cep

Sending a sideoutput when numRecordsOutPerSecond is equal to 0 in KeyedProcessFunction in Flink?


Could really appreciate some help on this.

I want to implement a use-case where a KeyedProcessFunction sends a "done" signal when it's numRecordsInPerSecond is equal to 0. The problem is I cannot seem to access this metric out-of-the box. Here's a code snippet of roughly what I'm trying to achieve:

public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
    private final OutputTag<String> outputTag;

    @Override
    public void open(Configuration parameters) throws Exception {}

    public MyKeyedProcessFunction(OutputTag<String> outputTag) {
        this.outputTag = outputTag;
    }
    
    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
        // Emit to side output if numRecordsInPerSecond is 0
        double numRecordsInPerSecond = getRuntimeContext().getMetricGroup().getIOMetricGroup().getNumRecordsInPerSecond();
        if (numRecordsInPerSecond == 0) {
            ctx.output(outputTag, "done");
        }
    }
    
}

Is it even possible for my flink program to access numRecordsInPerSecond for each operator? I know this metric is available in the WEBUI and can be made available externally but what about internally accessing it in the same Flink job?

Flink Version: 1.17.2


Solution

  • As @kkrugler pointed out, accessing metrics from within the job itself isn't straightforward -- there's no API for that.

    But from within a KeyedProcessFunction you have everything you need to calculate the number of events being processed each second. You can arrange for a processing time timer to fire every second, and maintain the counter yourself.