Suppose I have a pyspark dataframe like:
timestamp | status |
---|---|
2024-01-12 10:00:00 | approved |
2024-01-12 11:30:00 | declined |
2024-01-12 12:45:00 | approved |
2024-01-12 14:20:00 | approved |
2024-01-12 15:30:00 | declined |
2024-01-12 16:45:00 | approved |
2024-01-12 18:00:00 | declined |
2024-01-12 19:15:00 | approved |
2024-01-12 20:30:00 | approved |
2024-01-12 22:00:00 | approved |
where timestamp denotes the time of the transaction, and status denotes whether the transaction was approved or declined.
What is want is something like the time elapsed since the most recent approved transaction:
timestamp | status | last_approved_time | time_since_last_approved |
---|---|---|---|
2024-01-12 10:00:00 | approved | null | -1 |
2024-01-12 11:30:00 | declined | 2024-01-12 10:00:00 | 5400 |
2024-01-12 12:45:00 | approved | 2024-01-12 10:00:00 | 9900 |
2024-01-12 14:20:00 | approved | 2024-01-12 12:45:00 | 15600 |
2024-01-12 15:30:00 | declined | 2024-01-12 14:20:00 | 19800 |
2024-01-12 16:45:00 | approved | 2024-01-12 14:20:00 | 24300 |
2024-01-12 18:00:00 | declined | 2024-01-12 16:45:00 | 28800 |
2024-01-12 19:15:00 | approved | 2024-01-12 16:45:00 | 33300 |
2024-01-12 20:30:00 | approved | 2024-01-12 19:15:00 | 37800 |
2024-01-12 22:00:00 | approved | 2024-01-12 20:30:00 | 43200 |
The values in column time_since_last_approved are just random number, and I want to replace where we can't find the last approved transaction we fill the value by -1.
Thanks in advance.
Probably the easiest/ cleanest way to achieve this is by using window functions, but you could also use a combination of aggregations and joins. I will demonstrate the former approach below.
Using Window Functions
We can run a max
function with a when
that check the status over a Window
that is ordered by timestamp and ranges from the oldest timestamp to the previous timestamp.
In essence this is a two step process i.e.:
The code below demonstrates how this would work in your example (slightly different data):
from pyspark.sql.functions import to_timestamp, max, when, coalesce, lit
from pyspark.sql.window import Window
#mock the transaction dataframe
test_data = [
("2024-01-12 10:00:00", "declined"),
("2024-01-12 11:01:00", "approved"),
("2024-01-12 11:30:00", "declined"),
("2024-01-12 12:24:00", "declined"),
("2024-01-12 12:45:00", "approved"),
("2024-01-12 12:47:00", "declined"),
("2024-01-12 12:55:00", "approved")]
df_raw = spark.createDataFrame(test_data, ['timestamp_string', 'status'])
df_initial = df_raw.select(
df_raw.status,
to_timestamp(df_raw.timestamp_string).alias("timestamp")
)
#define window
window = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)
#create a dataframe with last approved transaction times
df_lagged = df_initial.withColumn(
"last_approved_time",
max(
when(df_initial.status == "approved", df_initial.timestamp)
).over(window))
#add timestamp difference in seconds and display result
display(
df_lagged.withColumn(
"time_since_last_approved",
coalesce(df_lagged.timestamp.cast("long") - df_lagged.last_approved_time.cast("long"), lit(-1))))