Search code examples
scalaapache-sparkapache-spark-sqluser-defined-functionsudf

Spark udf with non column parameters


I want to pass a variable and not a column to a UDF in spark.

The map is of the following format Spark dataframe to nested map

val joinUDF = udf((replacementLookup: Map[String, Double], newValue: String) => {
    replacementLookup.get(newValue) match {
      case Some(tt) => tt
      case None => 0.0
    }
  })

Should be mapped like

(columnsMap).foldLeft(df) {
    (currentDF, colName) =>
      {
        println(colName._1)
        println(colName._2)
        currentDF
          .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
      }
  }

But throws

type mismatch;
[error]  found   : Map
[error]  required: org.apache.spark.sql.Column
[error]           .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))

Solution

  • If you want to pass literals to an UDF, use org.apache.spark.sql.functions.lit

    i.e. use joinUDF(lit(colName._2), col(colName._1))

    But maps aren't supported, so you have to rewrite your code, e.g. by applying the Map-argument before creating the udf

    val joinFunction = (replacementLookup: Map[String, Double], newValue: String) => {
       replacementLookup.get(newValue) match {
         case Some(tt) => tt
         case None => 0.0
      }
    }
    
     (columnsMap).foldLeft(df) {
       (currentDF, colName) =>
       {
         val joinUDF = udf(joinFunction(colName._2, _:String))
         currentDF
           .withColumn("myColumn_" + colName._1, joinUDF(col(colName._1)))
       }
     }