I'm experimenting with Apache Flink for a project. I'm using Flink to aggregate environment data captured by a series of sensors. In order to calculate an air quality index I'm trying to implement a custom aggregate function to use in the grouped select with a window, but I have a problem with type hint. Here's the function code with the DataTypeHint annotation:
@FunctionHint(
input = {@DataTypeHint("INT"), @DataTypeHint("INT"), @DataTypeHint("INT")},
accumulator = @DataTypeHint("AQIAccumulator"),
output = @DataTypeHint("INT")
)
public class AQI extends AggregateFunction<Integer, AQIAccumulator> {
@Override
public AQIAccumulator createAccumulator() {
return new AQIAccumulator();
}
@Override
public Integer getValue(AQIAccumulator acc) {
return 100;
}
public void accumulate(AQIAccumulator acc, int pm10, int pm25, int co) {
System.out.println("PM10: " + pm10 + ", PM25: " + pm25 + ", CO: " + co);
}
public void retract(AQIAccumulator acc, int pm10, int pm25, int co) {
}
public void merge(AQIAccumulator acc, Iterable<AQIAccumulator> it) {
}
}
But I get the following exception:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'com.innovaway.seneca.jobs.envdata.functions.AQI'. Please check for implementation mistakes and/or provide a corresponding hint.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:211)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:121)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:270)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:783)
at com.innovaway.seneca.jobs.envdata.EnvironmentDataAggregationJob.main(EnvironmentDataAggregationJob.java:88)
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
... 26 more
Caused by: org.apache.flink.table.api.ValidationException: Error in function hint annotation.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:66)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.types.extraction.TemplateUtils.asFunctionTemplates(TemplateUtils.java:69)
at org.apache.flink.table.types.extraction.TemplateUtils.extractGlobalFunctionTemplates(TemplateUtils.java:46)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:151)
at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
... 28 more
Caused by: org.apache.flink.table.api.ValidationException: Error in data type hint annotation.
at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:86)
at org.apache.flink.table.types.extraction.FunctionTemplate.fromAnnotation(FunctionTemplate.java:72)
at org.apache.flink.table.types.extraction.TemplateUtils.lambda$asFunctionTemplates$0(TemplateUtils.java:64)
... 40 more
Caused by: org.apache.flink.table.api.TableException: User-defined types are not supported yet.
at org.apache.flink.table.catalog.DataTypeFactoryImpl.resolveType(DataTypeFactoryImpl.java:189)
at org.apache.flink.table.catalog.DataTypeFactoryImpl.access$100(DataTypeFactoryImpl.java:50)
at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:178)
at org.apache.flink.table.catalog.DataTypeFactoryImpl$LogicalTypeResolver.defaultMethod(DataTypeFactoryImpl.java:171)
at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:202)
at org.apache.flink.table.types.logical.UnresolvedUserDefinedType.accept(UnresolvedUserDefinedType.java:104)
at org.apache.flink.table.catalog.DataTypeFactoryImpl.createDataType(DataTypeFactoryImpl.java:80)
at org.apache.flink.table.types.extraction.DataTypeTemplate.extractDataType(DataTypeTemplate.java:297)
at org.apache.flink.table.types.extraction.DataTypeTemplate.fromAnnotation(DataTypeTemplate.java:112)
at org.apache.flink.table.types.extraction.FunctionTemplate.createResultTemplate(FunctionTemplate.java:84)
... 42 more
Process finished with exit code 1
What am I doing wrong?
The string version of a data type hint only works with SQL types. For POJOs and other classes, you can use @DataTypeHint(bridgedTo = AQIAccumulator.class)
.
Alternatively, you can simply override getTypeInference
and provide all components programmatically.
But for your example Flink should be smart enough to derive all types automatically using reflection. No hints required.