I want to convert my DataFrame column using implicits functions definition.
I have my DataFrame type defined, which contains additional functions:
class MyDF(df: DataFrame) {
def bytes2String(colName: String): DataFrame = df
.withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
.drop(colname)
.withColumnRenamed(colname + "_tmp", colname)
}
Then I define my implicit conversion class:
object NpDataFrameImplicits {
implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)
}
So finally, here is what I do in a small FunSuite unit test:
test("example: call to bytes2String") {
val df: DataFrame = ...
df.select("header.ID").show() // (1)
df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
df.bytes2String("header.ID").select("header.ID").show() // (3)
}
Show #1
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
Show #2
+------------------------------------+
|id |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
Show #3
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
As you can witness here, the third show
(aka without the column renaming) does not work as expected and shows us a non-converted ID column. Anyone knows why?
EDIT:
Output of df.select(col("header.ID") as "ID").bytes2String("ID").show()
:
+------------------------------------+
|ID |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
Let me explain, what is happening on your conversion function with bellow example. First Create data frame:
val jsonString: String =
"""{
| "employee": {
| "id": 12345,
| "name": "krishnan"
| },
| "_id": 1
|}""".stripMargin
val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))
val df: DataFrame = sparkSession.read.json(jsonRDD)
df.printSchema()
Output structure:
root
|-- _id: long (nullable = true)
|-- employee: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
Conversion function similar to your's:
def myConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
Scenario 1#
Do the conversion for root
level field.
myConversion(df, "_id").show()
myConversion(df, "_id").select("_id").show()
Result:
+----------------+---+
| employee|_id|
+----------------+---+
|[12345,krishnan]| 2|
|[12345,krishnan]| 2|
+----------------+---+
+---+
|_id|
+---+
| 2|
| 2|
+---+
Scenario 2# do the conversion for employee.id
. Here, when we use employee.id
means, data frame got added with new field id
at root
level. This is the correct behavior.
myConversion(df, "employee.id").show()
myConversion(df, "employee.id").select("employee.id").show()
Result:
+---+----------------+-----------+
|_id| employee|employee.id|
+---+----------------+-----------+
| 1|[12345,krishnan]| 12346|
| 1|[12345,krishnan]| 12346|
+---+----------------+-----------+
+-----+
| id|
+-----+
|12345|
|12345|
+-----+
Scenario 3# Select the inner field to root level and then perform conversion.
myConversion(df.select("employee.id"), "id").show()
Result:
+-----+
| id|
+-----+
|12346|
|12346|
+-----+
My new conversion function, takes struct type field and perform conversion and store it into struct type field itself. Here, pass employee
field and convert the id
field alone, but changes are done field employee
at root
level.
case class Employee(id: String, name: String)
def myNewConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
Your scenario number 3# using my conversion function.
myNewConversion(df, "employee").show()
myNewConversion(df, "employee").select("employee.id").show()
Result#
+---+----------------+
|_id| employee|
+---+----------------+
| 1|[12346,krishnan]|
| 1|[12346,krishnan]|
+---+----------------+
+-----+
| id|
+-----+
|12346|
|12346|
+-----+