Search code examples
apache-sparkprotocol-bufferskryo

Kryo Serialization Issue with a collection in ProtoBuf field


I am receiving a protobuf object from Kafka in my Spark (v1.6.1) application that uses Kryo serializer. The protobuf object looks something like this -

  private A() {
          abc_ = "";
          xyz_ = "";
          ... some more fields
          aList_ = java.util.Collections.emptyList();
          ... some more fields
    }

When I run the spark application, it throws exception for the collection "aList_" and I get the following error :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage 18.0 com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    Serialization trace:
    aList_ (...packageName/...protoBufObject$A)
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
     at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
     at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171)
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
     at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
     at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
     at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
     at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
     at org.apache.spark.scheduler.Task.run(Task.scala:89)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.UnsupportedOperationException
         at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:102)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
         at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
         ... 27 more

I see a similar issue in the below link but don't have a resolution yet.

Spark, Kryo Serialization Issue with ProtoBuf field

Has anybody else faced this issue?


Solution

  • In case anyone face this issue - I got it working using the method explained in my other post - How to set Unmodifiable collection serializer of Kryo in Spark code