In a Spark 2.3.0 Structured Streaming job I need to append a column to a DataFrame which is derived from the value of the same row of an existing column.
I want to define this transformation in a UDF and use withColumn to build the new DataFrame.
Doing this transform requires consulting a very-expensive-to-construct reference object -- constructing it once per record yields unacceptable performance.
What is the best way to construct and persist this object once per worker node so it can be referenced repeatedly for every record in every batch? Note that the object is not serializable.
My current attempts have revolved around subclassing UserDefinedFunction to add the expensive object as a lazy member and providing an alternate constructor to this subclass that does the init normally performed by the udf function, but I've been so far unable to get it to do the kind of type coercion that udf
does -- some deep type inference is wanting objects of type org.apache.spark.sql.Column
when my transformation lambda works on a string for input and output.
Something like this:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataType
class ExpensiveReference{
def ExpensiveReference() = ... // Very slow
def transformString(in:String) = ... // Fast
}
class PersistentValUDF(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]) extends UserDefinedFunction(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]){
lazy val ExpensiveReference = new ExpensiveReference()
def PersistentValUDF(){
this(((in:String) => ExpensiveReference.transformString(in) ):(String => String), StringType, Some(List(StringType)))
}
}
The further I dig into this rabbit hole the more I suspect there's a better way to accomplish this that I'm overlooking. Hence this post.
Edit: I tested initializing a reference lazily in an object declared in the UDF; this triggers reinitialization. Example code and object
class IntBox {
var valu = 0;
def increment {
valu = valu + 1
}
def get:Int ={
return valu
}
}
val altUDF = udf((input:String) => {
object ExpensiveRef{
lazy val box = new IntBox
def transform(in:String):String={
box.increment
return in + box.get.toString
}
}
ExpensiveRef.transform(input)
})
The above UDF always appends 1; so the lazy object is being reinitialized per-record.
I found this post whose Option 1 I was able to turn into a workable solution. The end result ended up being similar to Jacek Laskowski's answer, but with a few tweaks:
Something like this:
object ExpensiveReference {
lazy val ref = ...
}
val persistentUDF = udf((input:String)=>{
/*transform code that references ExpensiveReference.ref*/
})