Search code examples
apache-flinkflink-streamingflink-sql

Registering an Aggregate UDF in Apache Flink


I am trying to follow the steps here to create a basic Flink Aggregate UDF. I've added the dependencies () and implemented

public class MyAggregate extends AggregateFunction<Long, TestAgg> {..}

I've implemented the mandatory methods as well as a few other: accumulate, merge, etc. All this builds without errors. Now according to the docs, I should be able to register this as

    StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment sTableEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
    sTableEnv.registerFunction("MyMin", new MyAggregate());

But, the registerFucntion seems to want a ScalarFunction only as input. I am getting an incompatible type error: The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)

Any help would be great.


Solution

  • You need to import the StreamTableEnvironment for your chosen language which is in your case org.apache.flink.table.api.java.StreamTableEnvironment.

    org.apache.flink.table.api.StreamTableEnvironment is a common abstract class for the Java and Scala variants of StreamTableEnvironment. We've noticed that this part of the API is confusing for users and we will improve it in the future.