Search code examples
apache-flinkflink-sql

Custom aggregate function in flink type hint


I'm experimenting with Apache Flink for a project. I'm using Flink to aggregate environment data captured by a series of sensors. In order to calculate an air quality index I'm trying to implement a custom aggregate function to use in the grouped select with a window, but I have a problem with type hint. Here's the function code with the DataTypeHint annotation:

@FunctionHint(
        input = {@DataTypeHint("INT"), @DataTypeHint("INT"), @DataTypeHint("INT")},
        accumulator = @DataTypeHint("AQIAccumulator"),
        output = @DataTypeHint("INT")
)
public class AQI extends AggregateFunction<Integer, AQIAccumulator> {

    @Override
    public AQIAccumulator createAccumulator() {
        return new AQIAccumulator();
    }

    @Override
    public Integer getValue(AQIAccumulator acc) {
        return 100;
    }

    public void accumulate(AQIAccumulator acc, int pm10, int pm25, int co) {
        System.out.println("PM10: " + pm10 + ", PM25: " + pm25 + ", CO: " + co);
    }

    public void retract(AQIAccumulator acc, int pm10, int pm25, int co) {

    }

    public void merge(AQIAccumulator acc, Iterable<AQIAccumulator> it) {

    }

}

But I get the following exception:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'com.innovaway.seneca.jobs.envdata.functions.AQI'. Please check for implementation mistakes and/or provide a corresponding hint.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
    at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:211)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:121)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
    at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
    at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
    at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
    at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
    at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
    at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
    at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
    at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:270)
    at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:783)
    at com.innovaway.seneca.jobs.envdata.EnvironmentDataAggregationJob.main(EnvironmentDataAggregationJob.java:88)
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
    ... 26 more
Caused by: org.apache.flink.table.api.ValidationException: Error in function hint annotation.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:66)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at org.apache.flink.table.types.extraction.TemplateUtils.asFunctionTemplates(TemplateUtils.java:69)
    at org.apache.flink.table.types.extraction.TemplateUtils.extractGlobalFunctionTemplates(TemplateUtils.java:46)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:151)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
    ... 28 more
Caused by: org.apache.flink.table.api.ValidationException: Error in data type hint annotation.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:86)
    at org.apache.flink.table.types.extraction.FunctionTemplate.fromAnnotation(FunctionTemplate.java:72)
    at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:64)
    ... 40 more
Caused by: org.apache.flink.table.api.TableException: User-defined types are not supported yet.
    at org.apache.flink.table.catalog.DataTypeFactoryImpl.resolveType(DataTypeFactoryImpl.java:189)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl.access$100(DataTypeFactoryImpl.java:50)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:178)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:171)
    at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)
    at org.apache.flink.table.types.logical.UnresolvedUserDefinedType.accept(UnresolvedUserDefinedType.java:104)
    at org.apache.flink.table.catalog.DataTypeFactoryImpl.createDataType(DataTypeFactoryImpl.java:80)
    at org.apache.flink.table.types.extraction.DataTypeTemplate.extractDataType(DataTypeTemplate.java:297)
    at org.apache.flink.table.types.extraction.DataTypeTemplate.fromAnnotation(DataTypeTemplate.java:112)
    at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:84)
    ... 42 more

Process finished with exit code 1

What am I doing wrong?


Solution

  • The string version of a data type hint only works with SQL types. For POJOs and other classes, you can use @DataTypeHint(bridgedTo = AQIAccumulator.class).

    Alternatively, you can simply override getTypeInference and provide all components programmatically.

    But for your example Flink should be smart enough to derive all types automatically using reflection. No hints required.