Search code examples
pythonpyspark

Pyspark transform function with UDF not working


I am trying to add a new column 'parsed_date' (a string parsed to a date) inside the array of structs in my pyspark dataframe. To do that I am using dateparser.parse function as my dates can have unpredictable formats. I am using transform() and withField() to proccess the array.

I have tried the following code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, transform
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType
import dateparser

# Sample dataframe

data = [
    (1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
    (2, [{"date_field": "2021-07-15", "detail": "detail3"}])
]

schema = StructType([
    StructField("id", StringType(), True),
    StructField("array_of_structs", ArrayType(
        StructType([
            StructField("date_field", StringType(), True),
            StructField("detail", StringType(), True)
        ])
    ), True)
])

df = spark.createDataFrame(data, schema)

# UDF function to parse the dates
@udf
def my_udf(x):
    return dateparser.parse(x)

# Applying the UDF for array of structs
result = df.withColumn("array_of_structs", transform(
    "array_of_structs",
    lambda x: x.withField("parsed_date", my_udf(x["date_field"]))  
))

result.show(truncate=False)

But I am getting the following error:

org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: my_udf(lambda x_24#970.date_field)#968

I cannot figure out how to use the transform() function together with the UDF. Any help would be appreciated!


Solution

  • You cannot use F.transform with a Python UDF and you have to provide a returnType for your UDF. You can adjust the example below to use dateparser.

    from pyspark.sql import types as T
    from pyspark.sql import functions as F
    
    data = [
        (1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
        (2, [{"date_field": "2021-07-15", "detail": "detail3"}])
    ]
    
    schema = T.StructType([
        T.StructField('id', T.StringType(), True),
        T.StructField('array_of_structs', T.ArrayType(
            T.StructType([
                T.StructField('date_field', T.StringType(), True),
                T.StructField('detail', T.StringType(), True)
            ])
        ), True)
    ])
    
    df = spark.createDataFrame(data, schema)
    
    return_type = T.ArrayType(
        T.StructType([
            T.StructField('date_field', T.StringType(), True),
            T.StructField('detail', T.StringType(), True),
            T.StructField('parsed_date', T.DateType(), True)
        ]), True
    )
    
    
    @F.udf(returnType=return_type)
    def my_udf(array_struct: list) -> list:
        array = []
        for struct in array_struct:
            try:
                parsed_date = datetime.fromisoformat(struct['date_field'])
            except ValueError:
                parsed_date = None
    
            array.append({
                'date_field': struct['date_field'],
                'detail': struct['detail'],
                'parsed_date': parsed_date
            })
            print(array)
    
        return array
    
    
    new_struct = my_udf('array_of_structs').alias('new_array_of_structs')
    df.select(new_struct).show(2, False)
    
    # +------------------------------------------------------+
    # |new_array_of_structs                                  |
    # +------------------------------------------------------+
    # |[{january 2023, detail1, null}, {2011, detail2, null}]|
    # |[{2021-07-15, detail3, 2021-07-15}]                   |
    # +------------------------------------------------------+