I'm using Flink to process some JSON-format data coming from some Data Source.
For now, my process is quite simple: extract each element from the JSON-format data and print them into log file.
Here is my piece of code:
// create proper deserializer to deserializer the JSON-format data into ObjectNode
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
// create connector to receive data from Pravega
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.???.print();
Saying that the data coming from Pravega is like this: {"name":"titi", "age":18}
As I said, for now I simply need to extract name
and age
and print them.
So how could I do this?
As my understanding, I need to make some customized codes at ???
. I might need to create a custom POJO class which contains ObjectNode
. But I don't know how. I've read the official doc of Flink and also tried to google about how to create a custom POJO for Flink but I can't still figure out clearly.
Could you please show me an example?
Why don't You simply use something more meaningful instead of JavaSerializer
? Perhaps something from here.
You could then create a POJO with the fields you want to use and simply deserialize JSON data to Your POJO instead of ObjectNode
Also, if there is some specific reason that You need to have ObjectNode
on deserialization then You can simply do something like :
//I assume You have created the class named MyPojo
dataStream.map(new MapFunction<ObjectNode, MyPojo>() {
ObjectMapper mapper = new ObjectMapper();
@Override
public MyPojo map(final ObjectNode value) throws Exception {
mapper.readValue(value.asText(), MyPojo.class)
}
})