Search code examples
pysparkazure-synapse

How to define json schema from nested arrays


I'm working in Synapse and I need to define the JSON schema to read a stream, but I'm having some trouble getting to work. This is what the JSON looks like:

{
    "data": [
        [
            "2023-07-02T09:00:00",
            [
                [
                    49.998,
                    1062
                ],
                [
                    49.993529,
                    1054
                ]
            ]
        ],
        [
            "2023-07-02T09:15:00",
            [
                [
                    49.982727,
                    1116.363636
                ],
                [
                    49.981176,
                    1093.117647
                ]
            ]
        ]
    ],
    "objects": [
        "inverters/3",
        "inverters/4"
    ],
    "indicators": [
        "frequency",
        "power.ac"
    ]
}

This is how I've tried to define the schema:

    json_schema  = StructType([
        StructField("data", ArrayType(ArrayType(ArrayType(ArrayType(StringType(), containsNull=True), containsNull=True), containsNull=True), containsNull=True), True),
        StructField("indicators", ArrayType(StringType(), containsNull=True), True),
        StructField("objects", ArrayType(StringType(), containsNull=True), True)
    ])

But it returns everything null in the data column with the warning that there is corrupt data, even though there isn't. I've tried just reading (without being a stream) and getting the inferred JSON schema and this is what I get:

    json_schema  = StructType([
        StructField("data", ArrayType(ArrayType(StringType(), containsNull=True), containsNull=True), True),
        StructField("indicators", ArrayType(StringType(), containsNull=True), True),
        StructField("objects", ArrayType(StringType(), containsNull=True), True)
    ])

The problem with this is when I need to explode it doesn't know how to explode because it's defined as a string. What is the correct JSON schema for this type of data?


Solution

  • Use the schema below to read the data:

    json_schema = StructType([
        StructField("data", ArrayType(ArrayType(StringType(), containsNull=True), containsNull=True), True),
        StructField("indicators", ArrayType(StringType(), containsNull=True), True),
        StructField("objects", ArrayType(StringType(), containsNull=True), True)
    ])
    

    Use the following code to transform the data:

    import pyspark.sql.functions as F
    
    tmp_df = df.withColumn("tmp", F.explode("data")).select("*", F.col("tmp").getItem(0).alias("timestamp"), F.col("tmp").getItem(1).alias("values"))
    tmp_df = tmp_df.withColumn("values", F.explode(F.from_json("values", "array<array<double>>"))).withColumn("values", F.explode("values")).drop("data", "tmp")
    tmp_df.show()
    

    Output:

    indicators objects timestamp values
    [power.ac]
    [power.ac]
    [power.ac]
    [power.ac]
    [inverters/312e]
    [inverters/312e]
    [inverters/312e]
    [inverters/312e]
    2023-07-02T09:00:00
    2023-07-02T09:00:00
    2023-07-02T09:15:00
    2023-07-02T09:15:00
    1054.0
    1090.17
    1093.117647
    2093.137