env:saprk-2.4.5
{
"group": "1",
"name": "badboi",
"rank": "3",
"fellows": [
{
"name": "David",
"age": "25",
"hobby": "code"
},
{
"name": "John",
"age": "27",
"hobby": "tennis"
},
{
"name": "Anata",
"age": "23",
"hobby": "dance"
}
]
}
What I want is to add a new column 'ID'(generated by md5 with 'name' JSON body) in every element and rename the other elements' column name in the 'fellows' array like:
{
"group": "1",
"name": "badboi",
"rank": "3",
"fellows": [
{
"ID":"6F94AF80FC86BD2DBFAFA9C90BF33522",
"NAME": "David",
"AGE": "25",
"HOBBY": "code"
},
{
"ID":"CF848467689DD81CAC9E644F8294B641",
"NAME": "John",
"AGE": "27",
"HOBBY": "tennis"
},
{
"ID":"4F11EBFF1667DDD817921279EEBD5451",
"NAME": "Anata",
"AGE": "23",
"HOBBY": "dance"
}
]
}
I‘ve tried 'explode' and 'collect_set' func to solve it:
val source = spark.read.option("multiLine", "true").json("/mypath/source.json")
val explode_values = source.select($"group",$"name",$"rank",explode($"fellows").as("explode_fellows"))
val renamedDF = explode_values.select($"group",$"name",$"rank", struct(md5(to_json(struct($"explode_fellows.name".as("NAME")))).as("ID"), $"explode_fellows.name".as("NAME"), $"explode_fellows.age".as("AGE"), $"explode_fellows.HOBBY".as("HOBBY")).as("fellows"))
val result = renamedDF.select($"group", $"name", $"rank", $"fellows").groupBy($"group").agg(first($"name").as("name"),first($"rank").as("rank"), collect_set($"fellows").as("fellows"))
Then the result's schema is:
root
|-- group: string (nullable = true)
|-- name: string (nullable = true)
|-- rank: string (nullable = true)
|-- fellows: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ID: string (nullable = true)
| | |-- NAME: string (nullable = true)
| | |-- AGE: string (nullable = true)
| | |-- HOBBY: string (nullable = true)
Using 'array_zip' can only rename the columns:
val result2 = source.select($"group", $"name", $"rank", arrays_zip($"fellows.name", $"fellows.age", $"fellows.hobby").cast("array<struct<NAME: string, AGE:string, HOBBY:string>>").as("fellows"))
Then the result's schema is:
root
|-- group: string (nullable = true)
|-- name: string (nullable = true)
|-- rank: string (nullable = true)
|-- fellows: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- AGE: string (nullable = true)
| | |-- HOBBY: string (nullable = true)
The 'explode' and 'collect' solution doesn't meet my demand cos it is too complicated,
or if you can add md5 ID generation in my solution2 can help a lot.
Appreciated if you could give me some suggestion.
IIUC, another alternative should be as follows-
val data =
"""
|{
| "group": "1",
| "name": "badboi",
| "rank": "3",
| "fellows": [
| {
| "name": "David",
| "age": "25",
| "hobby": "code"
| },
| {
| "name": "John",
| "age": "27",
| "hobby": "tennis"
| },
| {
| "name": "Anata",
| "age": "23",
| "hobby": "dance"
| }
| ]
|}
""".stripMargin
val df = spark.read.option("multiLine", "true").json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +-----------------------------------------------------------+-----+------+----+
* |fellows |group|name |rank|
* +-----------------------------------------------------------+-----+------+----+
* |[[25, code, David], [27, tennis, John], [23, dance, Anata]]|1 |badboi|3 |
* +-----------------------------------------------------------+-----+------+----+
*
* root
* |-- fellows: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- age: string (nullable = true)
* | | |-- hobby: string (nullable = true)
* | | |-- name: string (nullable = true)
* |-- group: string (nullable = true)
* |-- name: string (nullable = true)
* |-- rank: string (nullable = true)
*/
val processedDF = df.withColumn("fellows",
expr("TRANSFORM(fellows, x -> named_struct('ID', md5(to_json(named_struct('ID', x.name))), 'NAME', x.name, 'AGE', x.age, 'HOBBY', x.hobby))"))
processedDF.show(false)
processedDF.printSchema()
/**
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
* |fellows |group|name |rank|
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
* |[[464e07afc9e46359fb480839150595c5, David, 25, code], [61409aa1fd47d4a5332de23cbf59a36f, John, 27, tennis], [540356fa1779480b07d0743763c78159, Anata, 23, dance]]|1 |badboi|3 |
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
*
* root
* |-- fellows: array (nullable = true)
* | |-- element: struct (containsNull = false)
* | | |-- ID: string (nullable = true)
* | | |-- NAME: string (nullable = true)
* | | |-- AGE: string (nullable = true)
* | | |-- HOBBY: string (nullable = true)
* |-- group: string (nullable = true)
* |-- name: string (nullable = true)
* |-- rank: string (nullable = true)
*/
processedDF.toJSON.show(false)
// {
// "fellows": [{
// "ID": "464e07afc9e46359fb480839150595c5",
// "NAME": "David",
// "AGE": "25",
// "HOBBY": "code"
// }, {
// "ID": "61409aa1fd47d4a5332de23cbf59a36f",
// "NAME": "John",
// "AGE": "27",
// "HOBBY": "tennis"
// }, {
// "ID": "540356fa1779480b07d0743763c78159",
// "NAME": "Anata",
// "AGE": "23",
// "HOBBY": "dance"
// }],
// "group": "1",
// "name": "badboi",
// "rank": "3"
// }