Search code examples
hadoopapache-sparkkryo

Kryo serialization error in Spark job


I want to use Kryo serialization in Spark job.

public class SerializeTest {

    public static class Toto implements Serializable {
        private static final long serialVersionUID = 6369241181075151871L;
        private String a;

        public String getA() {
            return a;
        }

        public void setA(String a) {
            this.a = a;
        }
    }

    private static final PairFunction<Toto, Toto, Integer> WRITABLE_CONVERTOR = new PairFunction<Toto, Toto, Integer>() {
        private static final long serialVersionUID = -7119334882912691587L;

        @Override
        public Tuple2<Toto, Integer> call(Toto input) throws Exception {
            return new Tuple2<Toto, Integer>(input, 1);
        }
    };

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SerializeTest");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(new Class<?>[]{Toto[].class});
        JavaSparkContext context = new JavaSparkContext(conf);

        List<Toto> list = new ArrayList<Toto>();
        list.add(new Toto());
        JavaRDD<Toto> cursor = context.parallelize(list, list.size());

        JavaPairRDD<Toto, Integer> writable = cursor.mapToPair(WRITABLE_CONVERTOR);
        writable.saveAsHadoopFile(args[0], Toto.class, Integer.class, SequenceFileOutputFormat.class);

        context.close();
    }

}

But i have this error :

java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization. at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179) at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/09/21 17:49:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Could not find a serializer for the Key class: 'com.test.SerializeTest.Toto'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization. at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1179) at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1094) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Thanks.


Solution

  • This error is related neither to Spark nor Kryo.

    When using Hadoop output formats you need to make sure your key and value are instances of Writable. Hadoop doesn't use Java serialization by default (and you don't want to use it either, because it's very ineffective)

    You can check your io.serializations property in configuration and you'll see list of used serializers including org.apache.hadoop.io.serializer.WritableSerialization

    To fix this issue your Toto class must implement Writable. The same issue is with Integer, use rather IntWritable.