Search code examples
apache-sparkpyspark

Pyspark vectorized UDF with conditional


I am trying to evaluate the following condition to add a column in Spark dataframe.

@pandas_udf(returnType=StringType())
def test(email, headers):
    if email is not None:
        return email
    else:
        return = headers.str.get("default")

What's the best way to check if email is null? I have tried several options but nothing works.

res = df.withColumn("out", test(col("email"), col("headers"))

Even if col("name") is null the else condition is not evaluated.


Solution

  • You'll be better of using built-in Spark SQL functions than defining a UDF, it will be much more performant.

    Something like this works:

    import pyspark.sql.functions as F
    from pyspark.sql.session import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    df = spark.createDataFrame(
        [
            ("[email protected]", "header1"),
            ("[email protected]", "header2"),
            (None, "header3"),
            ("[email protected]", "header4"),
            (None, "header5"),
        ],
        ["email", "headers"],
    )
    
    
    df.withColumn(
        "out", F.when(F.col("email").isNull(), F.col("headers")).otherwise(F.col("email"))
    ).show()
    +------------------+-------+------------------+                                 
    |             email|headers|               out|
    +------------------+-------+------------------+
    |[email protected]|header1|[email protected]|
    |[email protected]|header2|[email protected]|
    |              null|header3|           header3|
    |[email protected]|header4|[email protected]|
    |              null|header5|           header5|
    +------------------+-------+------------------+
    

    The when function gives you the "if" functionality you're looking for.