Search code examples
javaapache-flinkflink-table-api

Datatype extraction not working when registering UDF with .createTemporaryFunction()


I have a table "vertices" with a custom datatype "Properties", which implements a HashMap<String, PropertyValue> and is interpreted by Flink as

RAW('org...impl.properties.Properties', '...') 

datatype. PropertyValue is also a custom datatype. Neither the Properties class nor the PropertyValue class is a POJO.

vertex_id vertex_properties event_time
v1 Relevance=2:Integer, Weekday:Tuesday:String 2021-04-27 10:21:09.999

Now I want to extract each property value (relevance, weekday) and save it into an own column, which is done by a UDF function "ExtractPropertyValue".

public class ExtractPropertyValue extends ScalarFunction {

private final String propertyKey;

public ExtractPropertyValue(String propertyKey) {
        this.propertyKey = propertyKey;
}

public PropertyValue eval(Properties p) {
        return p.get(propertyKey);
    }

which leads to the table

vertex_id Relevance Weekday event_time
v1 2 Tuesday 2021-04-27 10:21:09.999

When I register the UDF "ExtractPropertyValue" with the deprecated method StreamTableEnvironment.registerFunction() and call it via the Table Api , for example with the "relevance" key

StreamTableEnvironment.registerFunction("ExtractPropertyValue", new ExtractPropertyValue("Relevance"))
vertices.select(call("ExtractPropertyValue", $("vertex_properties"))

I get the correct result table, as described above, but the datatype of the relevance column is now

LEGACY('RAW', 'ANY<org.gradoop.common.model.impl.properties.PropertyValue, rO0ABXNyADFvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5WYWx1ZVR5cGVJbmZvAAAAAAAAAAECAAFMAAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLlR5cGVJbmZvcm1hdGlvbpSNyEi6s3rrAgAAeHB2cgA2b3JnLmdyYWRvb3AuY29tbW9uLm1vZGVsLmltcGwucHJvcGVydGllcy5Qcm9wZXJ0eVZhbHVlAAAAAAAAAAECAAFMAAV2YWx1ZXQAEkxqYXZhL2xhbmcvT2JqZWN0O3hw>')

and not a RAW(....) type anymore. Later on, there are problems with converting those LEGACY datatypes, so I tried to register the function with the non deprecated method and call it the same way.

StreamTableEnvironment.createTemporaryFunction("ExtractPropertyValue", new ExtractPropertyValue("Relevance"))
vertices.select(call("ExtractPropertyValue", $("vertex_properties"))

But now a org.apache.flink.table.api.ValidationException is thrown, because flink could not extract the datatype of the Properties class anymore.

org.apache.flink.table.api.ValidationException: Could not extract a data type from 'class org...impl.properties.Properties'. Please pass the required data type manually or allow RAW types.
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'class org...impl.properties.Properties'. Interpreting it as a structured type was also not successful.
Caused by: org.apache.flink.table.api.ValidationException: Field 'properties' of class 'org...impl.properties.Properties' is neither publicly accessible nor does it have a corresponding getter method.

As I said, I cannot change the underlying Properties and PropertyValue classes. I also tried to add @FunctionsHints to my "ExtractPropertyValue" class like

@FunctionHint(
        input = @DataTypeHint(bridgedTo = Properties.class, allowRawPattern = "TRUE"),
        output = @DataTypeHint(bridgedTo = PropertyValue.class))
public class ExtractPropertyValue extends ScalarFunction {
....
}

But every annotation I tried didn't fix the error. Is the problem, that the updated Table API only supports POJOs as custom datatypes? Really grateful for every answer, since I don't know what to change anymore.


Solution

  • When someone stumbles over this question: The flink annotations were the correct solution, I just placed them wrong.

    I have UDFs like "ExtractPropertyValue" with the eval-method described here: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/

    There had to be one function hint above the class itself like:

    @FunctionHint(output = @DataTypeHint(value= "RAW", bridgedTo = PropertyValue.class))
    public class ExtractPropertyValue extends ScalarFunction {...}
    

    where PropertyValue is the custom datatype this function should extract from the table column. The other datatype hint is above the eval-function itself.

    @FunctionHint(input = @DataTypeHint(inputGroup = InputGroup.ANY))
    public PropertyValue eval(Object propertiesO) {
    Properties properties = (Properties) propertiesO;
        return properties.get(propertyKey);
    }
    

    I also had the problem with aggregation functions, that implement the accumulator method. There, a function-hint bridging the accumulator was necessary:

    @FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = AvgAcc.class),
      input = @DataTypeHint(inputGroup = InputGroup.ANY))
    public void accumulate(AvgAcc acc, Object iValueO) {...}
    

    Hope this helps someone!