Search code examples
scalaapache-sparkapache-spark-mllibkryospark3

Spark 3 KryoSerializer issue - Unable to find class: org.apache.spark.util.collection.OpenHashMap


I am upgrading a Spark 2.4 project to Spark 3.x. We are hitting a snag with some existing Spark-ml code:

var stringIndexers = Array[StringIndexer]()
for (featureColumn <- FEATURE_COLS) {
    stringIndexers = stringIndexers :+ new StringIndexer().setInputCol(featureColumn).setOutputCol(featureColumn + "_index")
}
val pipeline = new Pipeline().setStages(stringIndexers)
val dfWithNumericalFeatures = pipeline.fit(decoratedDf).transform(decoratedDf)

Specifically, this line: val dfWithNumericalFeatures = pipeline.fit(decoratedDf).transform(decoratedDf) now results in this cryptic exception in Spark 3:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 1 times, most recent failure: Lost task 0.0 in stage 238.0 (TID 5589) (executor driver): com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.util.collection.OpenHashMap$mcJ$sp$$Lambda$13346/2134122295
[info] Serialization trace:
[info] org$apache$spark$util$collection$OpenHashMap$$grow (org.apache.spark.util.collection.OpenHashMap$mcJ$sp)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:156)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
[info]  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
[info]  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
[info]  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
[info]  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
[info]  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
[info]  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
[info]  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
[info]  at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:397)
[info]  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
[info]  at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.deserialize(TypedAggregateExpression.scala:271)
[info]  at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.merge(interfaces.scala:568)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3(AggregationIterator.scala:199)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3$adapted(AggregationIterator.scala:199)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7(AggregationIterator.scala:213)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted(AggregationIterator.scala:207)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:77)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:107)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:85)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:131)
[info]  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
[info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]  at java.lang.Thread.run(Thread.java:750)
[info] Caused by: java.lang.ClassNotFoundException: org.apache.spark.util.collection.OpenHashMap$mcJ$sp$$Lambda$13346/2134122295
[info]  at java.lang.Class.forName0(Native Method)
[info]  at java.lang.Class.forName(Class.java:348)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
[info]  ... 36 more

I have searched around and the only relevant issue I've found is this un-answered SO question with the same issue: Spark Kryo Serialization issue.

OpenHashMap is not used in my code, seems likely that there is a bug with the KryoSerializer during this Pipeline.fit() function. Any ideas how to get around this? Thanks!

EDIT: I also just attempted removing usage of the KryoSerializer during my unit tests:

spark = SparkSession
      .builder
      .master("local[*]")
      .appName("UnitTest")
      .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
      .config("spark.driver.bindAddress", "127.0.0.1")
      .getOrCreate()

Confirmed that I am using the JavaSerializer: println(spark.conf.get("spark.serializer")) outputs org.apache.spark.serializer.JavaSerializer. Still same issue however, even when not using the KryoSerializer.


Solution

  • Try to change sparkVersion.
    Had the same problem with version 3.1.0
    Changed to 3.3.2