Search code examples
arraysapache-sparkpysparkstructtype-conversion

Transform and filter array of structs with parent struct field name


I am trying to do one more step further than this StackOverflow post (Convert struct of structs to array of structs pulling struct field name inside) where I need to pull the struct field name, filter each struct array based on a condition of role values and transform each struct element into a new struct with the extracted struct field name.

Input:

 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- struct_key: string (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- role: string (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- struct_key: string (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- role: string (nullable = true)

{
    "a": [{
            "two": [{
                "name": "person1"
                "role": "role1"
            },
            {
                "name": "person2"
                "role": "role1"
            },
            {
                "name": "person3"
                "role": "role2"
            }],
            "struct_key": "test1"
        },
        {
            "two": [{
                "name": "person4"
                "role": "role1"
            },
            {
                "name": "person5"
                "role": "role1"
            },
            {
                "name": "person6"
                "role": "role2"
            }],
            "struct_key": "test2"
        }
    ]
}
input ={
    "a": [{
            "two": [{
                    "name": "person1",
                    "role": "role1"
                },
                {
                    "name": "person2",
                    "role": "role1"
                },
                {
                    "name": "person3",
                    "role": "role2"
                }
            ],
            "struct_key": "test1"
        },
        {
            "two": [{
                    "name": "person4",
                    "role": "role1"
                },
                {
                    "name": "person5",
                    "role": "role1"
                },
                {
                    "name": "person6",
                    "role": "role2"
                }
            ],
            "struct_key": "test2"
        }
    ]
}

df = spark.read.json(sc.parallelize([input]))
print(df.selectExpr('inline(a)').schema)

Expected output after filtering (for roles) and new struct transformation:

 |-- role_output: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- struct_key: string (nullable = true)
 |    |    |-- name: string (nullable = true)
{
    role1_output: [
        {
            "struct_key": "test1",
            "name": "person1"
        }, 
        {
            "struct_key": "test1",
            "name": "person2"
        },
        {
            "struct_key": "test2",
            "name": "person4"
        },
        {
            "struct_key": "test2",
            "name": "person5"
        }
    ]
}

{
    role2_output: [
        {
            "struct_key": "test1",
            "name": "person3"
        }, 
        {
            "struct_key": "test2",
            "name": "person6"
        }
    ]
}

I have tried the struct to map type conversion from that StackOverflow post answer but cannot figure out how to combine the extracted struct_key with another array of struct field values and create a new struct as I start transforming the array element, I lose the struct_key field value. Any advice?


Solution

  • See below:

    from pyspark.sql.functions import explode
    from pyspark.sql.functions import array
    from pyspark.sql.functions import struct
    from pyspark.sql.functions import collect_list
    
    input ={
        "a": [{
                "two": [{
                        "name": "person1",
                        "role": "role1"
                    },
                    {
                        "name": "person2",
                        "role": "role1"
                    },
                    {
                        "name": "person3",
                        "role": "role2"
                    }
                ],
                "struct_key": "test1"
            },
            {
                "two": [{
                        "name": "person4",
                        "role": "role1"
                    },
                    {
                        "name": "person5",
                        "role": "role1"
                    },
                    {
                        "name": "person6",
                        "role": "role2"
                    }
                ],
                "struct_key": "test2"
            }
        ]
    }
    
    df = spark.read.json(sc.parallelize([input])).selectExpr('inline(a)').select('struct_key', explode('two')).groupBy('col.role').agg(collect_list(struct('col.name','struct_key')))
    df.show(truncate=False)
    df.printSchema()
    

    Gives you:

    >>> df.show(truncate=False)
    +-----+------------------------------------------------------------------------+
    |role |collect_list(struct(col.name, struct_key))                              |
    +-----+------------------------------------------------------------------------+
    |role2|[{person3, test1}, {person6, test2}]                                    |
    |role1|[{person1, test1}, {person2, test1}, {person4, test2}, {person5, test2}]|
    +-----+------------------------------------------------------------------------+
    
    >>> df.printSchema()
    root
     |-- role: string (nullable = true)
     |-- collect_list(struct(col.name, struct_key)): array (nullable = false)
     |    |-- element: struct (containsNull = false)
     |    |    |-- name: string (nullable = true)
     |    |    |-- struct_key: string (nullable = true)