Search code examples
pyspark

How to get value from pySpark func current_timestamp()


I want to use default value in function lead.
I have column subs_action_date with type timestamp
I use lead function and when it returns a null, then puts current timestamp.

import pyspark.sql.functions as F
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import TimestampType

spark = SparkSession.builder.appName("test").getOrCreate()

df = spark.createDataFrame(data=[(1,1,"2023-08-01 8:40"), (1,1,"2023-08-01 8:55")], \
   schema = ["sk_subs_id", "base_stat_id", "subs_action_date"])
df = df.withColumn("subs_action_date", F.col("subs_action_date").cast(TimestampType()))

w = Window.partitionBy("sk_subs_id").orderBy("subs_action_date")
df.select('*', F.lead("subs_action_date", 1, F.current_timestamp()).over(w).alias("next_action_date")).show()

Shema of DataFrame

But I recive an error: Column is not iterable.
I read that i put ColumnType in argument in function lead, but need a value.
Function current_timestamp() return ColumnType. How to get value from current_timestamp()?

I think about func expr(), but not undestand, how could i use it


Solution

  • You can get the current timestamp from a different query:

    import pyspark.sql.functions as F
    from pyspark.sql import SparkSession, Window
    from pyspark.sql.types import TimestampType
    
    spark = SparkSession.builder.appName("test").getOrCreate()
    
    df = spark.createDataFrame(
        data=[(1, 1, "2023-08-01 8:40"), (1, 1, "2023-08-01 8:55")],
        schema=["sk_subs_id", "base_stat_id", "subs_action_date"]
    )
    df = df.withColumn("subs_action_date", F.col("subs_action_date").cast(TimestampType()))
    
    w = Window.partitionBy("sk_subs_id").orderBy("subs_action_date")
    
    temp_df = spark.sql('SELECT CURRENT_TIMESTAMP() AS current_ts')
    current_ts = temp_df.collect()[0]['current_ts']
    df2 = df.select(
        '*',
        F.lead("subs_action_date", 1, current_ts).over(w).alias("next_action_date")
    )
    df2.show(10, False)
    
    # +----------+------------+-------------------+--------------------------+
    # |sk_subs_id|base_stat_id|subs_action_date   |next_action_date          |
    # +----------+------------+-------------------+--------------------------+
    # |1         |1           |2023-08-01 08:40:00|2023-08-01 08:55:00       |
    # |1         |1           |2023-08-01 08:55:00|2023-08-16 08:40:19.639075|
    # +----------+------------+-------------------+--------------------------+