Search code examples
apache-sparkpysparkapache-spark-sqlspark-streaming

How to rename existing columns add new columns in an array?


env:saprk-2.4.5

source.json

{
    "group": "1",
    "name": "badboi",
    "rank": "3",
    "fellows": [
        {
            "name": "David",
            "age": "25",
            "hobby": "code"
        },
        {
            "name": "John",
            "age": "27",
            "hobby": "tennis"
        },
        {
            "name": "Anata",
            "age": "23",
            "hobby": "dance"
        }
    ]
}

What I want is to add a new column 'ID'(generated by md5 with 'name' JSON body) in every element and rename the other elements' column name in the 'fellows' array like:

output.json

{
    "group": "1",
    "name": "badboi",
    "rank": "3",
    "fellows": [
        {
            "ID":"6F94AF80FC86BD2DBFAFA9C90BF33522",
            "NAME": "David",
            "AGE": "25",
            "HOBBY": "code"
        },
        {
            "ID":"CF848467689DD81CAC9E644F8294B641",
            "NAME": "John",
            "AGE": "27",
            "HOBBY": "tennis"
        },
        {
            "ID":"4F11EBFF1667DDD817921279EEBD5451",
            "NAME": "Anata",
            "AGE": "23",
            "HOBBY": "dance"
        }
    ]
}

My Solution:

1

I‘ve tried 'explode' and 'collect_set' func to solve it:

  val source = spark.read.option("multiLine", "true").json("/mypath/source.json")
  val explode_values = source.select($"group",$"name",$"rank",explode($"fellows").as("explode_fellows"))
  val renamedDF =  explode_values.select($"group",$"name",$"rank", struct(md5(to_json(struct($"explode_fellows.name".as("NAME")))).as("ID"),  $"explode_fellows.name".as("NAME"), $"explode_fellows.age".as("AGE"), $"explode_fellows.HOBBY".as("HOBBY")).as("fellows"))

val result = renamedDF.select($"group", $"name", $"rank", $"fellows").groupBy($"group").agg(first($"name").as("name"),first($"rank").as("rank"), collect_set($"fellows").as("fellows"))

Then the result's schema is:

root
 |-- group: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- fellows: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- AGE: string (nullable = true)
 |    |    |-- HOBBY: string (nullable = true)

2

Using 'array_zip' can only rename the columns:

val result2 = source.select($"group", $"name", $"rank", arrays_zip($"fellows.name", $"fellows.age", $"fellows.hobby").cast("array<struct<NAME: string, AGE:string, HOBBY:string>>").as("fellows"))

Then the result's schema is:

root
 |-- group: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- fellows: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- AGE: string (nullable = true)
 |    |    |-- HOBBY: string (nullable = true)

Note:

The 'explode' and 'collect' solution doesn't meet my demand cos it is too complicated,

or if you can add md5 ID generation in my solution2 can help a lot.

Appreciated if you could give me some suggestion.


Solution

  • IIUC, another alternative should be as follows-

    1. Load the data

       val data =
          """
            |{
            |    "group": "1",
            |    "name": "badboi",
            |    "rank": "3",
            |    "fellows": [
            |        {
            |            "name": "David",
            |            "age": "25",
            |            "hobby": "code"
            |        },
            |        {
            |            "name": "John",
            |            "age": "27",
            |            "hobby": "tennis"
            |        },
            |        {
            |            "name": "Anata",
            |            "age": "23",
            |            "hobby": "dance"
            |        }
            |    ]
            |}
          """.stripMargin
    
        val df = spark.read.option("multiLine", "true").json(Seq(data).toDS())
        df.show(false)
        df.printSchema()
    
        /**
          * +-----------------------------------------------------------+-----+------+----+
          * |fellows                                                    |group|name  |rank|
          * +-----------------------------------------------------------+-----+------+----+
          * |[[25, code, David], [27, tennis, John], [23, dance, Anata]]|1    |badboi|3   |
          * +-----------------------------------------------------------+-----+------+----+
          *
          * root
          * |-- fellows: array (nullable = true)
          * |    |-- element: struct (containsNull = true)
          * |    |    |-- age: string (nullable = true)
          * |    |    |-- hobby: string (nullable = true)
          * |    |    |-- name: string (nullable = true)
          * |-- group: string (nullable = true)
          * |-- name: string (nullable = true)
          * |-- rank: string (nullable = true)
          */
    
    

    2. Transform to add md5(name) as ID and change the case of structField

     val processedDF = df.withColumn("fellows",
          expr("TRANSFORM(fellows, x -> named_struct('ID', md5(to_json(named_struct('ID', x.name))), 'NAME', x.name, 'AGE', x.age, 'HOBBY', x.hobby))"))
        processedDF.show(false)
        processedDF.printSchema()
    
        /**
          * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
          * |fellows                                                                                                                                                          |group|name  |rank|
          * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
          * |[[464e07afc9e46359fb480839150595c5, David, 25, code], [61409aa1fd47d4a5332de23cbf59a36f, John, 27, tennis], [540356fa1779480b07d0743763c78159, Anata, 23, dance]]|1    |badboi|3   |
          * +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
          *
          * root
          * |-- fellows: array (nullable = true)
          * |    |-- element: struct (containsNull = false)
          * |    |    |-- ID: string (nullable = true)
          * |    |    |-- NAME: string (nullable = true)
          * |    |    |-- AGE: string (nullable = true)
          * |    |    |-- HOBBY: string (nullable = true)
          * |-- group: string (nullable = true)
          * |-- name: string (nullable = true)
          * |-- rank: string (nullable = true)
          */
        processedDF.toJSON.show(false)
    
    //    {
    //      "fellows": [{
    //      "ID": "464e07afc9e46359fb480839150595c5",
    //      "NAME": "David",
    //      "AGE": "25",
    //      "HOBBY": "code"
    //    }, {
    //      "ID": "61409aa1fd47d4a5332de23cbf59a36f",
    //      "NAME": "John",
    //      "AGE": "27",
    //      "HOBBY": "tennis"
    //    }, {
    //      "ID": "540356fa1779480b07d0743763c78159",
    //      "NAME": "Anata",
    //      "AGE": "23",
    //      "HOBBY": "dance"
    //    }],
    //      "group": "1",
    //      "name": "badboi",
    //      "rank": "3"
    //    }