Search code examples
javaapache-kafkaapache-flinkflink-streaming

How to deserialize when json data in Kafka is not clear when using Flink


I want to calculate the data in Kafka through Flink,but the problem is the JASON Data in Kafka could be mutative.

like this:

{"data":{"template":25,"name":"name"}}

or this:

{"data"{"type":"type1","props":"props","strem":"stream1"}

And I can't know in advance how much data is included in this JSON.So there is a probelm when using Flink:

streamExecutionEnvironment.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new KafkaEventSchema(),
            kafkaProps))
            .flatMap(new KafkaEventToRow()).returns(getReturnType());

So how to define the pojo type and mapFuncation when the Json data like this?


Solution

  • You have to define a more generic deserialization Schema like Map

    Define the Schema

    class CustomSchema implements DeserializationSchema {
    
        private ObjectMapper mapper = new ObjectMapper();
    
        @Override
        public Map<String,Object> deserialize(byte[] bytes) throws IOException {
            Map<String,Object> t = null;
            t = mapper.readValue(bytes, Map.class);
            return t;
        }
    
        @Override
        public boolean isEndOfStream(Object o) {
            return false;
        }
    
        @Override
        public TypeInformation<Map> getProducedType() {
            return TypeInformation.of(new TypeHint<Map>() {
            });
        }
    }
    

    Now use this as the Schema

    streamExecutionEnvironment
    .addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new CustomSchema(),......
    

    Now you get a generic Map which can inturn contain any datastructure