I have the following schema:
|--items : array
|-- element : struct
|-- id : long
|-- value : double
|-- stock : array
|-- element : string
How can I access the stock column to cast it to a struct using withColumn
?
I’ve tried with no success:
df = df.withColumn(
‘items’,
F.col(‘items’).withField(
‘stock’,
F.structure(
F.lit(None).cast(‘long’).alias(‘id’),
F.lit(None).cast(‘double’).alias(‘val’)
)
)
)
My desired output schema is:
|--items : array
|-- element : struct
|-- id : long
|-- value : double
|-- stock : array
|-- element : struct
|-- id : long
|-- val : double
You need to transform "stock" from an array of strings to an array of structs
So you need to use the explode
function on "items" array so data from there can go into separate rows.Then you need to use withColumn
to transform the "stock" array within these exploded rows. Finally you need to use collect_list
to reassemble the rows back into a single array.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType,
StructField,
LongType,
DoubleType,
ArrayType,
StringType,
)
# Initialize Spark session
spark = SparkSession.builder.appName("Nested Schema Transformation").getOrCreate()
# Sample Data
data = [(1, [(1, 1.0, ["a", "b"]), (2, 2.0, ["c", "d"])])]
schema = StructType(
[
StructField("id", LongType()),
StructField(
"items",
ArrayType(
StructType(
[
StructField("id", LongType()),
StructField("value", DoubleType()),
StructField("stock", ArrayType(StringType())),
]
)
),
),
]
)
df = spark.createDataFrame(data, schema)
# Explode 'items' array
df_exploded = df.select("id", F.explode("items").alias("item"))
# Transform 'stock' to desired schema
transformed_stock_schema = ArrayType(
StructType([StructField("id", LongType()), StructField("val", DoubleType())])
)
df_transformed = df_exploded.withColumn(
"new_stock",
F.expr("transform(item.stock, x -> named_struct('id', CAST(x AS long), 'val', CAST(x AS double)))"))
# Collect back to original array format
df_final = df_transformed.groupBy("id").agg(
F.collect_list(F.struct("item.id", "item.value", "new_stock")).alias("items")
)
# Show the result schema
df_final.printSchema()
The resultant schema is:
root
|-- id: long (nullable = true)
|-- items: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- id: long (nullable = true)
| | |-- value: double (nullable = true)
| | |-- new_stock: array (nullable = true)
| | | |-- element: struct (containsNull = false)
| | | | |-- id: long (nullable = true)
| | | | |-- val: double (nullable = true)