Search code examples
javaapache-sparklambdaclosureskryo

Configure function/lambda serialization in Spark


How do I configure Spark to use KryoSerializer for lambdas? Or have I found a bug in Spark? We have no issue with the serialization of the data elsewhere, just these lambdas where it uses the default instead of Kryo.

Here's the code:

JavaPairRDD<String, IonValue> rdd; // provided
IonSexp filterExpression; // provided
Function<Tuple2<String, IonValue>, Boolean> filterFunc = record -> myCustomFilter(filterExpression, record);
rdd = rdd.filter(filterFunc);

Exception thrown:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:388)
    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.filter(RDD.scala:387)
    at org.apache.spark.api.java.JavaPairRDD.filter(JavaPairRDD.scala:99)
    at com.example.SomeClass.process(SomeClass.java:ABC)
    {more stuff}
Caused by: java.io.NotSerializableException: com.amazon.ion.impl.lite.IonSexpLite
Serialization stack:
    - object not serializable (class: com.amazon.ion.impl.lite.IonSexpLite, value: (and (equals (literal 1) (path marketplace_id)) (equals (literal 351) (path product gl_product_group))))
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 2)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.example.SomeClass, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial com/example/SomeClass.lambda$process$8f20a2d2$1:(Lcom/amazon/ion/IonSexp;Lscala/Tuple2;)Ljava/lang/Boolean;, instantiatedMethodType=(Lscala/Tuple2;)Ljava/lang/Boolean;, numCaptured=2])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class com.example.SomeClass$$Lambda$36/263969036, com.example.SomeClass$$Lambda$36/263969036@31880efa)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$filter$1, name: f$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$filter$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 18 more

The problematic filterExpression in this case is an Ion S-Expression object, which does not implement java.io.Serializable. We're using the Kryo serializer and have registered & configured it such that it can serialize it just fine.

Code when we initialize our spark configuration:

sparkConf = new SparkConf().setAppName("SomeAppName").setMaster("MasterLivesHere")
        .set("spark.serializer", KryoSerializer.class.getCanonicalName())
        .set("spark.kryo.registrator", KryoRegistrator.class.getCanonicalName())
        .set("spark.kryo.registrationRequired", "false");

Code in our registrator:

kryo.register(com.amazon.ion.IonSexp.class);
kryo.register(Class.forName("com.amazon.ion.impl.lite.IonSexpLite"));

If I try to manually serialize that lambda with the code below

SerializationUtils.serialize(filterFunc);

It fails with the same error, as expected, since filterExpression is not serializable. However, the code below works:

sparkContext.env().serializer().newInstance().serialize(filterFunc, ClassTag$.MODULE$.apply(filterFunc.getClass()));

Which, again, is as expected since our Kryo set-up is able to handle these objects.

So my question/confusion is, why does Spark try to serialize that lambda with org.apache.spark.serializer.JavaSerializer when we've clearly configured it to use Kryo?


Solution

  • After some more digging it turns out there is indeed a different serializer being used for closures. The closure serializer is hard-coded to the default one due to a bug with Kryo.

    This answer does a decent job explaining it: https://stackoverflow.com/a/40261550/2158122

    I was able to solve my particular problem using broadcasts, though.

    Here's what my code looks like now:

    JavaSparkContext sparkContext; // provided
    JavaPairRDD<String, IonValue> rdd; // provided
    IonSexp filterExpression; // provided
    
    Broadcast<IonSexp> filterExprBroadcast = sparkContext.broadcast(filterExpression);
    rdd = rdd.filter(record -> myCustomFilter(filterExprBroadcast.value(), record));
    filterExprBroadcast.destroy(false); // Only do this after an action is executed
    

    Broadcasts are handled similarly to RDDs, so this does use the configured Kryo serializer.