Search code examples
scalaapache-sparkapache-spark-sqludfscala-reflect

Define spark udf by reflection on a String


I am trying to define a udf in spark(2.0) from a string containing scala function definition.Here is the snippet:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
import universe._
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val toolbox = currentMirror.mkToolBox()
val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int])
sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show

This gives me an error :

  Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
   at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
   at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
   at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
   at org.apache.spark.scheduler.Task.run(Task.scala:85)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)

However when I define the udf as :

val f = udf((s:String) => 5)

it works just fine. What is the issue here?The end objective is to take a string which has the defn of a scala function and use it as a udf.


Solution

  • As Giovanny observed, the problem lies in the class loaders being different (you can investigate this more by calling .getClass.getClassLoader on whatever object). Then, when the workers try to deserialize your reflected function, all hell breaks loose.

    Here is a solution that does not involve any class loader hackery. The idea is to move the reflection step to the workers. We'll end up having to redo the reflection step, but only once per worker. I think this is pretty optimal - even if you did the reflection only once on the master node, you would have to do a fair bit of work per worker to get them to recognize the function.

    val f = udf {
      new Function1[String,Int] with Serializable {
        import scala.reflect.runtime.universe._
        import scala.reflect.runtime.currentMirror
        import scala.tools.reflect.ToolBox
    
        lazy val toolbox = currentMirror.mkToolBox()
        lazy val func = {
          println("reflected function") // triggered at every worker
          toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]
        }
    
        def apply(s: String): Int = func(s)
      }
    }
    

    Then, calling sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show works just fine.

    Feel free to comment out the println - it is just an easy way of counting how many times the reflection happened. In spark-shell --master 'local' that's only once, but in spark-shell --master 'local[2]' it's twice.

    How it works

    The UDF gets evaluated immediately, but it never gets used until it reaches the worker nodes, so the lazy values toolbox and func only get evaluated on the workers. Furthermore, since they are lazy, they only ever get evaluated once per worker.