I am using Flink 1.12.0, and I have following simple test case:
I defined two model class(AbstractDataModel is super type, while the ConcreteModel is the sub type):
public interface AbstractDataModel {
public String getValue();
}
public class ConcreteModel implements AbstractDataModel {
private String key;
private String value;
public ConcreteModel() {
}
public ConcreteModel(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
Then, I define a simple application as follows,which is to map the ConcreteModel to string,
The MapFunction is using the super type AbstractDataModel, but there is compiling error complaining:
Required type:
MapFunction<com.ConcreteModel,java.lang.String>
Provided:
MyMapFunction
I would ask how to fix this problem if I still want to use AbstractDataModel as the generic type in the MapFunction
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
class MyMapFunction implements MapFunction<AbstractDataModel, String> {
public String map(AbstractDataModel model) throws Exception {
return model.getValue();
}
}
public class ConcreteModelTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.registerType(ConcreteModel.class);
// env.registerType(AbstractDataModel.class);
//
DataStream<String> ds = env.fromElements(new ConcreteModel("a", "1"), new ConcreteModel("b", "2")).map(new MyMapFunction());
ds.print();
env.execute();
}
}
That happens basically because Flink cannot process POJO
objects due to its distributed environment. Here is what says the docs:
15:45:51,460 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class … cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on “Data Types & Serialization” for details of the effect on performance.
You can use the ResultTypeQueryable and define the return type with TypeInformation.of(AbstractDataModel.class)
on the method public TypeInformation getProducedType()
.
This interface can be implemented by functions and input formats to tell the framework about their produced data type. This method acts as an alternative to the reflection analysis that is otherwise performed and is useful in situations where the produced data type may vary depending on parametrization.
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
public class MyMapFunction implements MapFunction<AbstractDataModel, String>,
ResultTypeQueryable {
@Override
public String map(AbstractDataModel value) throws Exception {
return value.getValue();
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(AbstractDataModel.class);
}
}
public class ConcreteModelTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AbstractDataModel concreteModel01 = new ConcreteModel("a", "1");
AbstractDataModel concreteModel02 = new ConcreteModel("a", "2");
DataStream<String> ds = env
.fromElements(concreteModel01, concreteModel02)
.map(new MyMapFunction());
ds.print();
env.execute();
}
}
or an easy way is to just call map
with TypeInformation.of(String.class)
. Then you don't need to implement ResultTypeQueryable
at MyMapFunction
.
DataStream<String> ds = env
.fromElements(concreteModel01, concreteModel02)
.map(new MyMapFunction(), TypeInformation.of(String.class));
and then just use your interface with its class implementation.
public interface AbstractDataModel {
public String getValue();
}
public class ConcreteModel implements AbstractDataModel {
private String key;
private String value;
public ConcreteModel() {
}
public ConcreteModel(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}