I want to calculate the data in Kafka through Flink,but the problem is the JASON Data in Kafka could be mutative.
like this:
{"data":{"template":25,"name":"name"}}
or this:
{"data"{"type":"type1","props":"props","strem":"stream1"}
And I can't know in advance how much data is included in this JSON.So there is a probelm when using Flink:
streamExecutionEnvironment.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new KafkaEventSchema(),
kafkaProps))
.flatMap(new KafkaEventToRow()).returns(getReturnType());
So how to define the pojo type and mapFuncation when the Json data like this?
You have to define a more generic deserialization Schema like Map
Define the Schema
class CustomSchema implements DeserializationSchema {
private ObjectMapper mapper = new ObjectMapper();
@Override
public Map<String,Object> deserialize(byte[] bytes) throws IOException {
Map<String,Object> t = null;
t = mapper.readValue(bytes, Map.class);
return t;
}
@Override
public boolean isEndOfStream(Object o) {
return false;
}
@Override
public TypeInformation<Map> getProducedType() {
return TypeInformation.of(new TypeHint<Map>() {
});
}
}
Now use this as the Schema
streamExecutionEnvironment
.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new CustomSchema(),......
Now you get a generic Map which can inturn contain any datastructure