I am trying to use PySpark to parse JSON files where the column names are defined once in the header. I'm getting stuck trying to zip/combine an array of column names to an array of values.
json_example = """{
"Header":
{
"Foo": 'bar',
"SignalList": [
{"Name": "id", "Type": "integer"},
{"Name": "field01", "Type": "float"},
{"Name": "field02", "Type": "float"}
]
},
"Payload": [
{
"Data": [
[10001, 2.2, -102.4],
[10002, 2.3, -102.3],
[10003, 2.2, -102.4],
[10004, 2.6, -102.4],
[10005, 2.5, -102.5]
]
},
{
"Data":[
[10006, 2.4, -102.3],
[10007, 2.5, -102.5],
[10008, 2.5, -102.5],
[10009, 2.6, -102.4]
]
}
]
}"""
My goal is to get it to the following format:
+-------+-------+-------+------+
| id |field01|field02| foo|
+-------+-------+-------+------+
| 10001| 2.2| -102.4| bar|
| 10002| 2.3| -102.3| bar|
| 10003| 2.2| -102.4| bar|
| 10004| 2.6| -102.4| bar|
| 10005| 2.5| -102.5| bar|
| 10006| 2.4| -102.3| bar|
| 10007| 2.5| -102.5| bar|
| 10008| 2.5| -102.5| bar|
| 10009| 2.6| -102.4| bar|
+-------+-------+-------+------+
So far, I'm loading the column names and data values as arrays:
# Read JSON
df = spark.read.option("multiLine","true").json(sc.parallelize([json_example]))
# Copy
df1 = df
# Get column names
df1 = df1.withColumn("colnames", F.col('Header.SignalList.Name'))
# Get data values
df1 = df1.withColumn("values", F.explode('Payload.Data').alias('values'))
df1 = df1.withColumn("values", F.explode('values'))
# Get metadata from Header
df1 = df1.withColumn("Foo", F.col("Header.Foo"))
df1 = df1.select(["foo","colnames", "values"])
df1.show(truncate=False)
Which results in this:
+---+----------------------+----------------------+
|foo|colnames |values |
+---+----------------------+----------------------+
|bar|[id, field01, field02]|[10001.0, 2.2, -102.4]|
|bar|[id, field01, field02]|[10002.0, 2.3, -102.3]|
|bar|[id, field01, field02]|[10003.0, 2.2, -102.4]|
|bar|[id, field01, field02]|[10004.0, 2.6, -102.4]|
|bar|[id, field01, field02]|[10005.0, 2.5, -102.5]|
|bar|[id, field01, field02]|[10006.0, 2.4, -102.3]|
|bar|[id, field01, field02]|[10007.0, 2.5, -102.5]|
|bar|[id, field01, field02]|[10008.0, 2.5, -102.5]|
|bar|[id, field01, field02]|[10009.0, 2.6, -102.4]|
+---+----------------------+----------------------+
I'm stuck trying to expand the ith value in the values field to a new column with the ith value in the colnames field as the alias.
FWIW, my actual dataset will have around 100 columns in the array.
I ended up collecting all the unique column names and using a UDF to lookup the appropriate value of each column:
from pyspark.sql.types import DoubleType
# Define a UDF that will lookup a name in 'colnames' array and
# return the corresponding value in the 'values' array at the same index
def lookup_by_name(name, colnames, values):
if name in colnames:
index = colnames.index(name)
return values[index]
else:
return None
lookup_by_name_udf = F.udf(lookup_by_name, DoubleType())
# Get all unique column names across all rows & arrays in "colnames"
unique_colnames = [r.colnames for r in df1.withColumn("colnames", F.explode(col("colnames"))).select("colnames").distinct().collect()]
# Create each unique column name from the "colnames" arrays
for name in unique_colnames:
df1 = df1.withColumn(name, lookup_by_name_udf(F.lit(name), col("colnames"), col("values")))
# Drop the array columns
df1 = df1.drop('colnames','values')
# Show
df1.show(truncate=False)
Result:
+---+-------+-------+-------+
|foo|field02|field01|id |
+---+-------+-------+-------+
|bar|-102.4 |2.2 |10001.0|
|bar|-102.3 |2.3 |10002.0|
|bar|-102.4 |2.2 |10003.0|
|bar|-102.4 |2.6 |10004.0|
|bar|-102.5 |2.5 |10005.0|
|bar|-102.3 |2.4 |10006.0|
|bar|-102.5 |2.5 |10007.0|
|bar|-102.5 |2.5 |10008.0|
|bar|-102.4 |2.6 |10009.0|
+---+-------+-------+-------+