Search code examples
pythondatetimepysparktime

how to calculate the time escaped since the most recent approved transaction in pyspark?


Suppose I have a pyspark dataframe like:

timestamp status
2024-01-12 10:00:00 approved
2024-01-12 11:30:00 declined
2024-01-12 12:45:00 approved
2024-01-12 14:20:00 approved
2024-01-12 15:30:00 declined
2024-01-12 16:45:00 approved
2024-01-12 18:00:00 declined
2024-01-12 19:15:00 approved
2024-01-12 20:30:00 approved
2024-01-12 22:00:00 approved

where timestamp denotes the time of the transaction, and status denotes whether the transaction was approved or declined.

What is want is something like the time elapsed since the most recent approved transaction:

timestamp status last_approved_time time_since_last_approved
2024-01-12 10:00:00 approved null -1
2024-01-12 11:30:00 declined 2024-01-12 10:00:00 5400
2024-01-12 12:45:00 approved 2024-01-12 10:00:00 9900
2024-01-12 14:20:00 approved 2024-01-12 12:45:00 15600
2024-01-12 15:30:00 declined 2024-01-12 14:20:00 19800
2024-01-12 16:45:00 approved 2024-01-12 14:20:00 24300
2024-01-12 18:00:00 declined 2024-01-12 16:45:00 28800
2024-01-12 19:15:00 approved 2024-01-12 16:45:00 33300
2024-01-12 20:30:00 approved 2024-01-12 19:15:00 37800
2024-01-12 22:00:00 approved 2024-01-12 20:30:00 43200

The values in column time_since_last_approved are just random number, and I want to replace where we can't find the last approved transaction we fill the value by -1.

Thanks in advance.


Solution

  • Probably the easiest/ cleanest way to achieve this is by using window functions, but you could also use a combination of aggregations and joins. I will demonstrate the former approach below.

    Using Window Functions

    We can run a max function with a when that check the status over a Window that is ordered by timestamp and ranges from the oldest timestamp to the previous timestamp.

    In essence this is a two step process i.e.:

    1. Get the previous approved timestamp
    2. Calculate time difference and handle nulls

    The code below demonstrates how this would work in your example (slightly different data):

    from pyspark.sql.functions import to_timestamp, max, when, coalesce, lit
    from pyspark.sql.window import Window
    
    #mock the transaction dataframe   
    test_data = [
        ("2024-01-12 10:00:00", "declined"), 
        ("2024-01-12 11:01:00", "approved"),
        ("2024-01-12 11:30:00", "declined"),
        ("2024-01-12 12:24:00", "declined"),
        ("2024-01-12 12:45:00", "approved"),
        ("2024-01-12 12:47:00", "declined"),
        ("2024-01-12 12:55:00", "approved")]
    
    df_raw = spark.createDataFrame(test_data, ['timestamp_string', 'status']) 
    df_initial = df_raw.select(
        df_raw.status,
        to_timestamp(df_raw.timestamp_string).alias("timestamp")
    )
    
    #define window
    window = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)
    
    #create a dataframe with last approved transaction times
    df_lagged = df_initial.withColumn(
        "last_approved_time", 
        max(
            when(df_initial.status == "approved", df_initial.timestamp)
        ).over(window))
    
    #add timestamp difference in seconds and display result
    display(
        df_lagged.withColumn(
            "time_since_last_approved", 
            coalesce(df_lagged.timestamp.cast("long") - df_lagged.last_approved_time.cast("long"), lit(-1))))