I am trying to define a udf in spark(2.0) from a string containing scala function definition.Here is the snippet:
val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show
This gives me an error :
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
However when I define the udf as :
val f = udf((s:String) => 5)
it works just fine. What is the issue here?The end objective is to take a string which has the defn of a scala function and use it as a udf.
As Giovanny observed, the problem lies in the class loaders being different (you can investigate this more by calling .getClass.getClassLoader
on whatever object). Then, when the workers try to deserialize your reflected function, all hell breaks loose.
Here is a solution that does not involve any class loader hackery. The idea is to move the reflection step to the workers. We'll end up having to redo the reflection step, but only once per worker. I think this is pretty optimal - even if you did the reflection only once on the master node, you would have to do a fair bit of work per worker to get them to recognize the function.
val f = udf {
new Function1[String,Int] with Serializable {
import scala.reflect.runtime.universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
lazy val toolbox = currentMirror.mkToolBox()
lazy val func = {
println("reflected function") // triggered at every worker
toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
}
def apply(s: String): Int = func(s)
}
}
Then, calling sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show
works just fine.
Feel free to comment out the println
- it is just an easy way of counting how many times the reflection happened. In spark-shell --master 'local'
that's only once, but in spark-shell --master 'local[2]'
it's twice.
The UDF gets evaluated immediately, but it never gets used until it reaches the worker nodes, so the lazy values toolbox
and func
only get evaluated on the workers. Furthermore, since they are lazy, they only ever get evaluated once per worker.