Error when run Flink job:
ClassCastException: cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results of type java.util.ArrayList in instance of org.apache.flink.runtime.jobgraph.InputOutputFormatVertex
Below is the source code
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<String> filtered = text.filter(new FilterFunction<String>()
{
public boolean filter(String value)
{
return value.startsWith("N");
}
});
DataSet<Tuple2<String, Integer>> tokenized = filtered.map(new Tokenizer());
DataSet<Tuple2<String, Integer>> counts = tokenized.groupBy(new int[] { 0 }).sum(1);
if (params.has("output"))
{
counts.writeAsText(params.get("output"));
env.execute("WordCount Example");
}
}
public static final class Tokenizer
implements MapFunction<String, Tuple2<String, Integer>>
{
public Tuple2<String, Integer> map(String value)
{
return new Tuple2(value, Integer.valueOf(1));
}
}
Error: Error image
You are right David Anderson, It was a version mismatch on my local machine, I have fixed this by upgrading my local Flink cluster version to latest version.