I'm working in Synapse and I need to define the JSON schema to read a stream, but I'm having some trouble getting to work. This is what the JSON looks like:
{
"data": [
[
"2023-07-02T09:00:00",
[
[
49.998,
1062
],
[
49.993529,
1054
]
]
],
[
"2023-07-02T09:15:00",
[
[
49.982727,
1116.363636
],
[
49.981176,
1093.117647
]
]
]
],
"objects": [
"inverters/3",
"inverters/4"
],
"indicators": [
"frequency",
"power.ac"
]
}
This is how I've tried to define the schema:
json_schema = StructType([
StructField("data", ArrayType(ArrayType(ArrayType(ArrayType(StringType(), containsNull=True), containsNull=True), containsNull=True), containsNull=True), True),
StructField("indicators", ArrayType(StringType(), containsNull=True), True),
StructField("objects", ArrayType(StringType(), containsNull=True), True)
])
But it returns everything null in the data column with the warning that there is corrupt data, even though there isn't. I've tried just reading (without being a stream) and getting the inferred JSON schema and this is what I get:
json_schema = StructType([
StructField("data", ArrayType(ArrayType(StringType(), containsNull=True), containsNull=True), True),
StructField("indicators", ArrayType(StringType(), containsNull=True), True),
StructField("objects", ArrayType(StringType(), containsNull=True), True)
])
The problem with this is when I need to explode it doesn't know how to explode because it's defined as a string. What is the correct JSON schema for this type of data?
Use the schema below to read the data:
json_schema = StructType([
StructField("data", ArrayType(ArrayType(StringType(), containsNull=True), containsNull=True), True),
StructField("indicators", ArrayType(StringType(), containsNull=True), True),
StructField("objects", ArrayType(StringType(), containsNull=True), True)
])
Use the following code to transform the data:
import pyspark.sql.functions as F
tmp_df = df.withColumn("tmp", F.explode("data")).select("*", F.col("tmp").getItem(0).alias("timestamp"), F.col("tmp").getItem(1).alias("values"))
tmp_df = tmp_df.withColumn("values", F.explode(F.from_json("values", "array<array<double>>"))).withColumn("values", F.explode("values")).drop("data", "tmp")
tmp_df.show()
Output:
indicators | objects | timestamp | values |
---|---|---|---|
[power.ac] [power.ac] [power.ac] [power.ac] |
[inverters/312e] [inverters/312e] [inverters/312e] [inverters/312e] |
2023-07-02T09:00:00 2023-07-02T09:00:00 2023-07-02T09:15:00 2023-07-02T09:15:00 |
1054.0 1090.17 1093.117647 2093.137 |