I have been running into InvalidTypesException
s in Flink, typically when customizing a generic SourceFunction<OUT>
. Here's an example which, when added to my StreamExecutionEnvironment, throws these exceptions at runtime:
public class MyCustomSource<OUT> extends RichSourceFunction<OUT> {
@Override
public void run(SourceContext<OUT> sourceContext) throws Exception {
OUT foo = null;
// ... creates foo somehow ...
sourceContext.collect(foo);
}
@Override
public void cancel() {
// ...
}
}
The relevant exception text is:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'class org.apache.flink.streaming.api.functions.source.RichSourceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
This happens whether OUT
is a POJO, Generic type, Flink internal type like a Tuple, etc.
I've found a reliable way to avoid this by adding a Type Hint via the returns()
method. For example:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MyCustomSource<String>())
.returns(String.class)
//.etc.
But this method is deprecated in flink 1.1.4; does anybody know what is the non-deprecated way to provide a Type Hint? The Flink Internals wiki only mentions returns()
, but it was last updated over a year ago.
Your MyCustomSource
should implement ResultTypeQueryable
interface to return the type to Flink as a TypeInformation
.