I have following code:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());
FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
"test-kafka-topic",
new SimpleStringSchema(),
properties);
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);
DataStream<String> stringStream = kafkaInputStream
.map(new MapFunction<MyCustomClass,String>() {
@Override
public String map(MyCustomClass message) {
logger.info("--- Received message : " + message.toString());
return message.toString();
}
});
streamEnv.execute("Published messages");
MyCustomClassDeserializer is implemented to convert byte array to java object.
When I run this program locally, I get error:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected.
And I get this for code line:
.map(new MapFunction<MyCustomClass,String>() {
Not sure why I get this?
So, You have a deserializer that returns POJO, yet You are telling Flink that it should deserialize record from byte[]
to String
by using SimpleStringSchema
.
See the problem now? :)
I don't think You should use the custom Kafka deserializers in FlinkKafkaConsumer
in general. What You should aim for instead is to instead create a custom class that extends DeserializationSchema
from Flink. It should be much better in terms of type safety and testability.