Search code examples
azureapache-sparkpysparkazure-synapse

PySpark: Parsing JSON files where column names are defined once in the header


I am trying to use PySpark to parse JSON files where the column names are defined once in the header. I'm getting stuck trying to zip/combine an array of column names to an array of values.

json_example = """{
    "Header":
    {
        "Foo": 'bar',
        "SignalList": [
            {"Name": "id", "Type": "integer"},
            {"Name": "field01", "Type": "float"},
            {"Name": "field02", "Type": "float"}
        ]
    },
    "Payload": [
        {
            "Data": [
                [10001, 2.2, -102.4],
                [10002, 2.3, -102.3],
                [10003, 2.2, -102.4],
                [10004, 2.6, -102.4],
                [10005, 2.5, -102.5]
            ]
        },
        {
            "Data":[
                [10006, 2.4, -102.3],
                [10007, 2.5, -102.5],
                [10008, 2.5, -102.5],
                [10009, 2.6, -102.4]
            ]
        }
    ]
}"""

My goal is to get it to the following format:

+-------+-------+-------+------+
|    id |field01|field02|   foo|
+-------+-------+-------+------+
|  10001|    2.2| -102.4|   bar|
|  10002|    2.3| -102.3|   bar|
|  10003|    2.2| -102.4|   bar|
|  10004|    2.6| -102.4|   bar|
|  10005|    2.5| -102.5|   bar|
|  10006|    2.4| -102.3|   bar|
|  10007|    2.5| -102.5|   bar|
|  10008|    2.5| -102.5|   bar|
|  10009|    2.6| -102.4|   bar|
+-------+-------+-------+------+

So far, I'm loading the column names and data values as arrays:

# Read JSON
df = spark.read.option("multiLine","true").json(sc.parallelize([json_example]))
# Copy
df1 = df
# Get column names
df1 = df1.withColumn("colnames", F.col('Header.SignalList.Name'))
# Get data values
df1 = df1.withColumn("values", F.explode('Payload.Data').alias('values'))
df1 = df1.withColumn("values", F.explode('values'))
# Get metadata from Header
df1 = df1.withColumn("Foo", F.col("Header.Foo"))
df1 = df1.select(["foo","colnames", "values"])
df1.show(truncate=False)

Which results in this:

+---+----------------------+----------------------+
|foo|colnames              |values                |
+---+----------------------+----------------------+
|bar|[id, field01, field02]|[10001.0, 2.2, -102.4]|
|bar|[id, field01, field02]|[10002.0, 2.3, -102.3]|
|bar|[id, field01, field02]|[10003.0, 2.2, -102.4]|
|bar|[id, field01, field02]|[10004.0, 2.6, -102.4]|
|bar|[id, field01, field02]|[10005.0, 2.5, -102.5]|
|bar|[id, field01, field02]|[10006.0, 2.4, -102.3]|
|bar|[id, field01, field02]|[10007.0, 2.5, -102.5]|
|bar|[id, field01, field02]|[10008.0, 2.5, -102.5]|
|bar|[id, field01, field02]|[10009.0, 2.6, -102.4]|
+---+----------------------+----------------------+

I'm stuck trying to expand the ith value in the values field to a new column with the ith value in the colnames field as the alias.

FWIW, my actual dataset will have around 100 columns in the array.


Solution

  • I ended up collecting all the unique column names and using a UDF to lookup the appropriate value of each column:

    from pyspark.sql.types import DoubleType
    
    # Define a UDF that will lookup a name in 'colnames' array and 
    # return the corresponding value in the 'values' array at the same index
    def lookup_by_name(name, colnames, values):
        if name in colnames:
            index = colnames.index(name)
            return values[index]
        else:
            return None
    lookup_by_name_udf = F.udf(lookup_by_name, DoubleType())
    
    # Get all unique column names across all rows & arrays in "colnames"
    unique_colnames = [r.colnames for r in df1.withColumn("colnames", F.explode(col("colnames"))).select("colnames").distinct().collect()]
    
    # Create each unique column name from the "colnames" arrays
    for name in unique_colnames:
        df1 = df1.withColumn(name, lookup_by_name_udf(F.lit(name), col("colnames"), col("values")))
    
    # Drop the array columns
    df1 = df1.drop('colnames','values')
    
    # Show
    df1.show(truncate=False)
    

    Result:

    +---+-------+-------+-------+
    |foo|field02|field01|id     |
    +---+-------+-------+-------+
    |bar|-102.4 |2.2    |10001.0|
    |bar|-102.3 |2.3    |10002.0|
    |bar|-102.4 |2.2    |10003.0|
    |bar|-102.4 |2.6    |10004.0|
    |bar|-102.5 |2.5    |10005.0|
    |bar|-102.3 |2.4    |10006.0|
    |bar|-102.5 |2.5    |10007.0|
    |bar|-102.5 |2.5    |10008.0|
    |bar|-102.4 |2.6    |10009.0|
    +---+-------+-------+-------+