I'm using Flink to process the data coming from some data source (such as Kafka, Pravega etc).
In my case, the data source is Pravega, which provided me a flink connector.
My data source is sending me some JSON data as below:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
...
...
Here is my piece of code:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode node) throws Exception {
return node.toString();
}
})
.keyBy("word") // ERROR
.timeWindow(Time.seconds(10))
.sum("count");
As you see, I used the FlinkPravegaReader
and a proper deserializer to get the JSON stream coming from Pravega.
Then I try to transform the JSON data into a String, KeyBy
them and count them.
However, I get an error:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:340)
myflink.StreamingJob.main(StreamingJob.java:114)
It seems that KeyBy
threw this exception.
Well, I'm not a Flink expert so I don't know why. I've read the source code of the official example WordCount
. In that example, there is a custtom splitter, which is used to split the String data into words.
So I'm thinking if I need to use some kind of splitter in this case too? If so, what kind of splitter should I use? Can you show me an example? If not, why did I get such an error and how to solve it?
I guess you have read the document about how to specify keys
The example codes use keyby("word")
because word
is a field of POJO type WC
.
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
In your case, you put a map
operator before keyBy
, and the output of this map
operator is a string
. So there is obviously no word
field in your case. If you actually want to group this string
stream, you need to write it like this .keyBy(String::toString)
Or you can even implement a customized keySelector
to generate your own key
.