Search code examples
azurescalaapache-sparkazure-synapse-analytics

Problems in UDF function in Spark 3.3 (scala)


We are changing some code from spark 2.4 to spark 3.3 and we are having problems with our UDF function. We generate the JAR (after modifying the library versions in the POM) with all the logic and run it from Azure in an Apache Spark Pool.

When we ran in version 2.4 the code worked correctly, but now in version 3.3 we are having errors.

The most significant logs are the following:

Caused by: java.lang.IllegalStateException: SparkSession should only be created and accessed on the driver.
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$assertOnDriver(SparkSession.scala:1159)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:927)
at com.XXXX.AAAA.BBBB.api.Functions$.<init>(Functions.scala:21)
at com.XXXX.AAAA.BBBB.api.Functions$.<clinit>(Functions.scala)


WARN TaskSetManager: Lost task 103.0 in stage 8.0 (TID 2936) (vm-eXXXXXXX executor 43): java.lang.NoClassDefFoundError: Could not initialize class com.XXXX.AAAA.BBBB.api.Functions

The code is the following:

object Functions {

    /********************************************************************/
    private val spark = SparkContextCustom.spark
    import spark.implicits._
    
    val getDate = udf((year: Int, month: Int) => {
        val newDate = LocalDate.of(year, month, 1)
          .minusDays(1)
          .minusDays(newDate.getDayOfWeek().getValue() - 1)
        newDate.getDayOfMonth()
    })

    spark.udf.register("getDate", getDate)
        
        
    def changeValueInCol(col1: Column, col2: Column): Column = {

        val year = "2023" // example
        val month = "10"
        val day = "10"
        lazy val calculatedDay = getDate(year, lit(3))
        
        .when(month === lit(3) && day < calculatedDay, lit("I")) 
        .otherwise(col2) 
        
  }
  
    def filterTable(df: DataFrame): DataFrame = {
        /* More logic*/
        df.changeValueInCol($"col1", $"col2") 
        
    }
  
}

And where we call the function:

object XXXX extends App {
  
    /********************************************************************/
    val spark = SparkContextCustom.spark
    import spark.implicits._
    implicit val paralellism: Int = SparkContextCustom.parallelism

    /* After some logic */
    val df_filtered = Functions.filterTable(df)
    
    SparkContextCustom.stop
    
}

I am trying several things, but always I am getting "SparkSession should only be created and accessed on the driver" error. Running the code from a notebook works properly, but not in the JAR.


Solution

  • Are either of these Functions.scala:21:

    private val spark = SparkContextCustom.spark
    spark.udf.register("getDate", getDate)
    

    if so this can only be done on the driver and you are using something else in Functions from an executor, move those out into another object and try again. Any reference to Functions will trigger that code and attempt to use SparkContext/Session etc. which isn't allowed on an executor. That it worked in 2.4 was mostly luck.