Search code examples
javagenericsapache-flinktype-erasure

What is the non-deprecated way to give Type Hints to flink output streams?


I have been running into InvalidTypesExceptions in Flink, typically when customizing a generic SourceFunction<OUT>. Here's an example which, when added to my StreamExecutionEnvironment, throws these exceptions at runtime:

public class MyCustomSource<OUT> extends RichSourceFunction<OUT> {
    @Override
    public void run(SourceContext<OUT> sourceContext) throws Exception {
        OUT foo = null;
        // ... creates foo somehow ...
        sourceContext.collect(foo);
    }
    @Override
    public void cancel() {
        // ...
    }
}

The relevant exception text is:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'class org.apache.flink.streaming.api.functions.source.RichSourceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

This happens whether OUT is a POJO, Generic type, Flink internal type like a Tuple, etc.

I've found a reliable way to avoid this by adding a Type Hint via the returns() method. For example:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MyCustomSource<String>())
   .returns(String.class)
   //.etc.

But this method is deprecated in flink 1.1.4; does anybody know what is the non-deprecated way to provide a Type Hint? The Flink Internals wiki only mentions returns(), but it was last updated over a year ago.


Solution

  • Your MyCustomSource should implement ResultTypeQueryable interface to return the type to Flink as a TypeInformation.

    See https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/common/index.html#type-erasure--type-inference