Search code examples
jsonapache-flinkflink-streaming

How to split the data of NodeObject in Apache Flink


I'm using Flink to process the data coming from some data source (such as Kafka, Pravega etc).

In my case, the data source is Pravega, which provided me a flink connector.

My data source is sending me some JSON data as below:

{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...

Here is my piece of code:

PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
        @Override
        public String map(ObjectNode node) throws Exception {
            return node.toString();
        }
    })
    .keyBy("word")    // ERROR
    .timeWindow(Time.seconds(10))
    .sum("count");

As you see, I used the FlinkPravegaReader and a proper deserializer to get the JSON stream coming from Pravega.

Then I try to transform the JSON data into a String, KeyBy them and count them.

However, I get an error:

 The program finished with the following exception:

Field expression must be equal to '*' or '_' for non-composite types.
        org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
        org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
        myflink.StreamingJob.main(StreamingJob.java:114)

It seems that KeyBy threw this exception.

Well, I'm not a Flink expert so I don't know why. I've read the source code of the official example WordCount. In that example, there is a custtom splitter, which is used to split the String data into words.

So I'm thinking if I need to use some kind of splitter in this case too? If so, what kind of splitter should I use? Can you show me an example? If not, why did I get such an error and how to solve it?


Solution

  • I guess you have read the document about how to specify keys

    Specify keys

    The example codes use keyby("word") because word is a field of POJO type WC.

    // some ordinary POJO (Plain old Java Object)
    public class WC {
      public String word;
      public int count;
    }
    DataStream<WC> words = // [...]
    DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
    

    In your case, you put a map operator before keyBy, and the output of this map operator is a string. So there is obviously no word field in your case. If you actually want to group this string stream, you need to write it like this .keyBy(String::toString)

    Or you can even implement a customized keySelector to generate your own key.

    Customized Key Selector