I'm using Flink to process the data coming from some data source (such as Kafka, Pravega etc).
In my case, the data source is Pravega, which provided me a flink connector.
My data source is sending me some JSON data as below:
{"key": "value"}
{"key": "value2"}
{"key": "value3"}
Here is my piece of code:
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.map(new MapFunction<ObjectNode, String>() {
public String map(ObjectNode node) throws Exception {
return node.toString();
.keyBy("word") // ERROR
As you see, I used the FlinkPravegaReader
and a proper deserializer to get the JSON stream coming from Pravega.
Then I try to transform the JSON data into a String, KeyBy
them and count them.
However, I get an error:
The program finished with the following exception:
Field expression must be equal to '*' or '_' for non-composite types.
It seems that KeyBy
threw this exception.
Well, I'm not a Flink expert so I don't know why. I've read the source code of the official example WordCount
. In that example, there is a custtom splitter, which is used to split the String data into words.
So I'm thinking if I need to use some kind of splitter in this case too? If so, what kind of splitter should I use? Can you show me an example? If not, why did I get such an error and how to solve it?
I guess you have read the document about how to specify keys
The example codes use keyby("word")
because word
is a field of POJO type WC
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
In your case, you put a map
operator before keyBy
, and the output of this map
operator is a string
. So there is obviously no word
field in your case. If you actually want to group this string
stream, you need to write it like this .keyBy(String::toString)
Or you can even implement a customized keySelector
to generate your own key