I am trying to add a new column 'parsed_date' (a string parsed to a date) inside the array of structs in my pyspark dataframe. To do that I am using dateparser.parse function as my dates can have unpredictable formats. I am using transform() and withField() to proccess the array.
I have tried the following code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, transform
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType
import dateparser
# Sample dataframe
data = [
(1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
(2, [{"date_field": "2021-07-15", "detail": "detail3"}])
]
schema = StructType([
StructField("id", StringType(), True),
StructField("array_of_structs", ArrayType(
StructType([
StructField("date_field", StringType(), True),
StructField("detail", StringType(), True)
])
), True)
])
df = spark.createDataFrame(data, schema)
# UDF function to parse the dates
@udf
def my_udf(x):
return dateparser.parse(x)
# Applying the UDF for array of structs
result = df.withColumn("array_of_structs", transform(
"array_of_structs",
lambda x: x.withField("parsed_date", my_udf(x["date_field"]))
))
result.show(truncate=False)
But I am getting the following error:
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: my_udf(lambda x_24#970.date_field)#968
I cannot figure out how to use the transform() function together with the UDF. Any help would be appreciated!
You cannot use F.transform
with a Python UDF and you have to provide a returnType
for your UDF. You can adjust the example below to use dateparser
.
from pyspark.sql import types as T
from pyspark.sql import functions as F
data = [
(1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
(2, [{"date_field": "2021-07-15", "detail": "detail3"}])
]
schema = T.StructType([
T.StructField('id', T.StringType(), True),
T.StructField('array_of_structs', T.ArrayType(
T.StructType([
T.StructField('date_field', T.StringType(), True),
T.StructField('detail', T.StringType(), True)
])
), True)
])
df = spark.createDataFrame(data, schema)
return_type = T.ArrayType(
T.StructType([
T.StructField('date_field', T.StringType(), True),
T.StructField('detail', T.StringType(), True),
T.StructField('parsed_date', T.DateType(), True)
]), True
)
@F.udf(returnType=return_type)
def my_udf(array_struct: list) -> list:
array = []
for struct in array_struct:
try:
parsed_date = datetime.fromisoformat(struct['date_field'])
except ValueError:
parsed_date = None
array.append({
'date_field': struct['date_field'],
'detail': struct['detail'],
'parsed_date': parsed_date
})
print(array)
return array
new_struct = my_udf('array_of_structs').alias('new_array_of_structs')
df.select(new_struct).show(2, False)
# +------------------------------------------------------+
# |new_array_of_structs |
# +------------------------------------------------------+
# |[{january 2023, detail1, null}, {2011, detail2, null}]|
# |[{2021-07-15, detail3, 2021-07-15}] |
# +------------------------------------------------------+