Search code examples
scalaapache-sparkapache-spark-sqldatabricksspark-structured-streaming

How to construct and persist a reference object per worker in a Spark 2.3.0 UDF?


In a Spark 2.3.0 Structured Streaming job I need to append a column to a DataFrame which is derived from the value of the same row of an existing column.

I want to define this transformation in a UDF and use withColumn to build the new DataFrame.

Doing this transform requires consulting a very-expensive-to-construct reference object -- constructing it once per record yields unacceptable performance.

What is the best way to construct and persist this object once per worker node so it can be referenced repeatedly for every record in every batch? Note that the object is not serializable.

My current attempts have revolved around subclassing UserDefinedFunction to add the expensive object as a lazy member and providing an alternate constructor to this subclass that does the init normally performed by the udf function, but I've been so far unable to get it to do the kind of type coercion that udf does -- some deep type inference is wanting objects of type org.apache.spark.sql.Column when my transformation lambda works on a string for input and output.

Something like this:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataType

class ExpensiveReference{
  def ExpensiveReference() = ... // Very slow
  def transformString(in:String) = ... // Fast
}

class PersistentValUDF(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]) extends UserDefinedFunction(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]){  
  lazy val ExpensiveReference = new ExpensiveReference()
  def PersistentValUDF(){
    this(((in:String) => ExpensiveReference.transformString(in) ):(String => String), StringType, Some(List(StringType)))
  }
}

The further I dig into this rabbit hole the more I suspect there's a better way to accomplish this that I'm overlooking. Hence this post.

Edit: I tested initializing a reference lazily in an object declared in the UDF; this triggers reinitialization. Example code and object

class IntBox {
  var valu = 0;
  def increment {
    valu = valu + 1
  }
  def get:Int ={
    return valu
  }
}


val altUDF = udf((input:String) => {
  object ExpensiveRef{
     lazy val box = new IntBox
     def transform(in:String):String={
       box.increment
       return in + box.get.toString
     }
  }
  ExpensiveRef.transform(input)
})

The above UDF always appends 1; so the lazy object is being reinitialized per-record.


Solution

  • I found this post whose Option 1 I was able to turn into a workable solution. The end result ended up being similar to Jacek Laskowski's answer, but with a few tweaks:

    1. Pull the object definition outside of the UDF's scope. Even being lazy, it will still reinitialize if it's defined in the scope of the UDF.
    2. Move the transform function off of the object and into the UDF's lambda (required to avoid serialization errors)
    3. Capture the object's lazy member in the closure of the UDF lambda

    Something like this:

    object ExpensiveReference {
      lazy val ref = ...
      }
    val persistentUDF = udf((input:String)=>{
      /*transform code that references ExpensiveReference.ref*/
    })