Search code examples
azure-databrickspyspark

Lambda Expression + pySpark


Am trying to compare a column in spark DataFrame against a given date, if column date is less than given date add n hour else add x hours.

something like

addhours = lambda x,y: X + 14hrs if (x < y) else X + 10hrs

where y will hold a static date specified then apply on DataFrame column

something like

df = df.withColumn("newDate", checkDate(df.Time, F.lit('2015-01-01') ))

here is sample for df

from pyspark.sql import functions as F
import datetime
df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2020-02-01 10:00:00')],["OriginTz", "Time"])

Am bit new to spark dataframes :)


Solution

  • Use when+othewise statement instead of udf.

    Example:

    from pyspark.sql import functions as F
    
    #we are casting to timestamp and date so that we can compare in when
    df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2003-02-01 10:00:00')],["OriginTz", "Time"]).\
    withColumn("literal",F.lit('2015-01-01').cast("date")).\
    withColumn("Time",F.col("Time").cast("timestamp"))
    
    df.show()
    #+---------------+-------------------+----------+
    #|       OriginTz|               Time|   literal|
    #+---------------+-------------------+----------+
    #|America/NewYork|2020-02-01 10:00:00|2015-01-01|
    #| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|
    #+---------------+-------------------+----------+
    
    #using unix_timestamp function converting to epoch time then adding 10*3600 -> 10 hrs finally converting to timestamp format
    df.withColumn("new_date",F.when(F.col("Time") > F.col("literal"),F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 10 * 3600)).\
        otherwise(F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 14 * 3600))).\
    show()
    
    #+---------------+-------------------+----------+-------------------+
    #|       OriginTz|               Time|   literal|           new_date|
    #+---------------+-------------------+----------+-------------------+
    #|America/NewYork|2020-02-01 10:00:00|2015-01-01|2020-02-01 20:00:00|
    #| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|2003-02-02 00:00:00|
    #+---------------+-------------------+----------+-------------------+
    

    In case if you don't want to add literal value as dataframe column.

    lit_val='2015-01-01'
    
    df = spark.createDataFrame([('America/NewYork', '2020-02-01 10:00:00'),('Africa/Nairobi', '2003-02-01 10:00:00')],["OriginTz", "Time"]).\
    withColumn("Time",F.col("Time").cast("timestamp"))
    
    df.withColumn("new_date",F.when(F.col("Time") > F.lit(lit_val).cast("date"),F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 10 * 3600)).\
        otherwise(F.to_timestamp(F.unix_timestamp(F.col("Time"),'yyyy-MM-dd HH:mm:ss')  + 14 * 3600))).\
    show()
    
    #+---------------+-------------------+----------+-------------------+
    #|       OriginTz|               Time|   literal|           new_date|
    #+---------------+-------------------+----------+-------------------+
    #|America/NewYork|2020-02-01 10:00:00|2015-01-01|2020-02-01 20:00:00|
    #| Africa/Nairobi|2003-02-01 10:00:00|2015-01-01|2003-02-02 00:00:00|
    #+---------------+-------------------+----------+-------------------+