Search code examples

ClassCastException while Flink run : cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results

Error when run Flink job:

ClassCastException: cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results of type java.util.ArrayList in instance of org.apache.flink.runtime.jobgraph.InputOutputFormatVertex

Below is the source code

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ParameterTool params = ParameterTool.fromArgs(args);


        DataSet<String> text = env.readTextFile(params.get("input"));

        DataSet<String> filtered = text.filter(new FilterFunction<String>()

            public boolean filter(String value)
                return value.startsWith("N");
        DataSet<Tuple2<String, Integer>> tokenized = Tokenizer());

        DataSet<Tuple2<String, Integer>> counts = tokenized.groupBy(new int[] { 0 }).sum(1);
        if (params.has("output"))

            env.execute("WordCount Example");

    public static final class Tokenizer
            implements MapFunction<String, Tuple2<String, Integer>>
        public Tuple2<String, Integer> map(String value)
            return new Tuple2(value, Integer.valueOf(1));

Error: Error image


  • You are right David Anderson, It was a version mismatch on my local machine, I have fixed this by upgrading my local Flink cluster version to latest version.