I am trying to follow the steps here to create a basic Flink Aggregate UDF. I've added the dependencies () and implemented
public class MyAggregate extends AggregateFunction<Long, TestAgg> {..}
I've implemented the mandatory methods as well as a few other: accumulate, merge, etc
. All this builds without errors. Now according to the docs, I should be able to register this as
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment sTableEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
sTableEnv.registerFunction("MyMin", new MyAggregate());
But, the registerFucntion
seems to want a ScalarFunction
only as input. I am getting an incompatible type error: The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)
Any help would be great.
You need to import the StreamTableEnvironment
for your chosen language which is in your case org.apache.flink.table.api.java.StreamTableEnvironment
.
org.apache.flink.table.api.StreamTableEnvironment
is a common abstract class for the Java and Scala variants of StreamTableEnvironment
. We've noticed that this part of the API is confusing for users and we will improve it in the future.