Search code examples
apache-kafkaapache-flink

apache flink with Kafka: InvalidTypesException


I have following code:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka-topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

MyCustomClassDeserializer is implemented to convert byte array to java object.

When I run this program locally, I get error:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected.

And I get this for code line:

.map(new MapFunction<MyCustomClass,String>() {

Not sure why I get this?


Solution

  • So, You have a deserializer that returns POJO, yet You are telling Flink that it should deserialize record from byte[] to String by using SimpleStringSchema. See the problem now? :)

    I don't think You should use the custom Kafka deserializers in FlinkKafkaConsumer in general. What You should aim for instead is to instead create a custom class that extends DeserializationSchema from Flink. It should be much better in terms of type safety and testability.