Search code examples
scalaapache-sparkuser-defined-functionsimplicit-conversion

DataFrame user-defined function not applied unless I change column name


I want to convert my DataFrame column using implicits functions definition.

I have my DataFrame type defined, which contains additional functions:

class MyDF(df: DataFrame) {
    def bytes2String(colName: String): DataFrame = df
       .withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
       .drop(colname)
       .withColumnRenamed(colname + "_tmp", colname)
}

Then I define my implicit conversion class:

object NpDataFrameImplicits {
    implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)
}

So finally, here is what I do in a small FunSuite unit test:

test("example: call to bytes2String") {
    val df: DataFrame = ...
    df.select("header.ID").show() // (1)
    df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
    df.bytes2String("header.ID").select("header.ID").show() // (3)
}

Show #1

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

Show #2

+------------------------------------+
|id                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

Show #3

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

As you can witness here, the third show (aka without the column renaming) does not work as expected and shows us a non-converted ID column. Anyone knows why?

EDIT:

Output of df.select(col("header.ID") as "ID").bytes2String("ID").show():

+------------------------------------+
|ID                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

Solution

  • Let me explain, what is happening on your conversion function with bellow example. First Create data frame:

    val jsonString: String =
        """{
          | "employee": {
          |   "id": 12345,
          |   "name": "krishnan"
          | },
          | "_id": 1
          |}""".stripMargin
    
      val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))
    
      val df: DataFrame = sparkSession.read.json(jsonRDD)
      df.printSchema()
    

    Output structure:

    root
     |-- _id: long (nullable = true)
     |-- employee: struct (nullable = true)
     |    |-- id: long (nullable = true)
     |    |-- name: string (nullable = true)
    

    Conversion function similar to your's:

    def myConversion(myDf: DataFrame, colName: String): DataFrame = {
        myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
          .drop(colName)
          .withColumnRenamed(colName + "_tmp", colName)
      }
    

    Scenario 1# Do the conversion for root level field.

    myConversion(df, "_id").show()
    myConversion(df, "_id").select("_id").show()
    

    Result:

    +----------------+---+
    |        employee|_id|
    +----------------+---+
    |[12345,krishnan]|  2|
    |[12345,krishnan]|  2|
    +----------------+---+
    +---+
    |_id|
    +---+
    |  2|
    |  2|
    +---+
    

    Scenario 2# do the conversion for employee.id. Here, when we use employee.id means, data frame got added with new field id at root level. This is the correct behavior.

    myConversion(df, "employee.id").show()
    myConversion(df, "employee.id").select("employee.id").show()
    

    Result:

    +---+----------------+-----------+
    |_id|        employee|employee.id|
    +---+----------------+-----------+
    |  1|[12345,krishnan]|      12346|
    |  1|[12345,krishnan]|      12346|
    +---+----------------+-----------+
    +-----+
    |   id|
    +-----+
    |12345|
    |12345|
    +-----+
    

    Scenario 3# Select the inner field to root level and then perform conversion.

    myConversion(df.select("employee.id"), "id").show()
    

    Result:

    +-----+
    |   id|
    +-----+
    |12346|
    |12346|
    +-----+
    

    My new conversion function, takes struct type field and perform conversion and store it into struct type field itself. Here, pass employee field and convert the id field alone, but changes are done field employee at root level.

    case class Employee(id: String, name: String)
    
    def myNewConversion(myDf: DataFrame, colName: String): DataFrame = {
        myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
          .drop(colName)
          .withColumnRenamed(colName + "_tmp", colName)
      }
    

    Your scenario number 3# using my conversion function.

    myNewConversion(df, "employee").show()
    myNewConversion(df, "employee").select("employee.id").show()
    

    Result#

    +---+----------------+
    |_id|        employee|    
    +---+----------------+
    |  1|[12346,krishnan]|
    |  1|[12346,krishnan]|
    +---+----------------+
    +-----+
    |   id|
    +-----+
    |12346|
    |12346|
    +-----+