I am using Spark 1.6.1 and Python. How can I enable Kryo serialization when working with PySpark?
I have the following settings in the spark-default.conf file:
spark.eventLog.enabled true
spark.eventLog.dir //local_drive/sparkLogs
spark.default.parallelism 8
spark.locality.wait.node 5s
spark.executor.extraJavaOptions -XX:+UseCompressedOops
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.classesToRegister Timing, Join, Select, Predicate, Timeliness, Project, Query2, ScanSelect
spark.shuffle.compress true
And the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:128)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:273)
at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:258)
at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:174)
Caused by: java.lang.ClassNotFoundException: Timing
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:120)
at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:120)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:120)
The main class contains (Query2.py):
from Timing import Timing
from Predicate import Predicate
from Join import Join
from ScanSelect import ScanSelect
from Select import Select
from Timeliness import Timeliness
from Project import Project
conf = SparkConf().setMaster(master).setAppName(sys.argv[1]).setSparkHome("$SPARK_HOME")
sc = SparkContext(conf=conf)
conf.set("spark.kryo.registrationRequired", "true")
sqlContext = SQLContext(sc)
I know that "Kryo won’t make a major impact on PySpark because it just stores data as byte[] objects, which are fast to serialize even with Java. But it may be worth a try to set spark.serializer and not try to register any classes"(Matei Zaharia, 2014). However, I need to register the classes.
Thanks in advance.
It is not possible. Kryo is a Java (JVM) serialization framework. It cannot be used with Python classes. To serialize Python object PySpark is using Python serialization tools including standard pickle
module and improved version of coludpickle
. You can find some additional information about PySpark serialization in Tips for properly using large broadcast variables?.
Sp while you can enable Kryo serialization when working with PySpark, this won't affect the way how Python objects are serialized. It will be used only for serialization of Java or Scala objects.