Search code examples
apache-flinkflink-streamingflink-sql

In flink how to track a custom metric in a class that extends UserDefinedFunction


I have a simple function that is handling some string which is a json

public class ArraySizeUdf extends ScalarFunction {
    private static final Logger LOG = LoggerFactory.getLogger(ArraySizeUdf.class);
    private final static ObjectMapper mapper = new ObjectMapper();

    public int eval(String stringJsonArray) {
        try {

            if (stringJsonArray == null) {
                return 0;
            }

            JsonNode actualObj = mapper.readTree(stringJsonArray);
            ArrayNode aa = (ArrayNode) actualObj;
            return aa.size();
        } catch (Exception e) {
            LOG.error("Error deserializing json to find size : {}", stringJsonArray);
            return -1;
        }
    }
}

I want to track the number of exception that occur herw. Since I dont have access to getRuntimeContext(), what can I use to track any custom metrics I define here ?

i have registred udf using streamTableEnvironment.createTemporaryFunction("array_size", ArraySizeUdf.class);

and using it as select array_size('["some", "other"]') from table;


Solution

  • Your ScalarFunction can define an open(FunctionContext context) method, and the context passed in has a getMetricGroup method.