I am trying to integrate a "user session" defined by a rolling time window, in my spark data.
I have been using this issue for that : How to aggregate over rolling time window with groups in Spark
The difference with me is I'd like my time window to be about 5 hours, so I can't use datediff that will returns a number of days.
Here is my data set :
[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123),
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123),
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123),
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456),
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456),
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456),
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789),
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789),
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789),
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)]
All I need is to add a column that will index the session, by user. (For example, auction_id 9999 is session 0, and auction_id 8888 and auction_id 5555 are session 1 (because there are many days between 9999 and 8888 and only a few minutes between 8888 and 5555). And we start the indexing at 0 for the next user.
Here is my code :
# Add a timestamp (integer) column
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer'))
# We partition by user and order by timestamp
w = Window.partitionBy("user_id_64").orderBy("timestamp")
# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned.
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
# If difference higher than 5 hours
indicator = (diff > 5 * 60 * 60).cast("integer")
# We increment for each indicator = 1
subgroup = sum(test).over(w).alias("session_index")
# We get everything
df = df.select("*", subgroup)
At the end, the session_index is 0 for everyone. The issue comes from the line diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
. Here, it is lit(0) that is returned each time (I checked by changing the 0 value). So I try simplifying my script by changing a few lines :
test = "timestamp" - lag("timestamp", 1).over(w)
subgroup = sum(test).over(w).alias("session_index")
I removed the coalesce and the abs functions. session_index is "None" for each line.
If I replace test by test = "timestamp"
, this will be fine : I'll get the accumulated sum of the timestamps.
If I replace it by test = lag("timestamp", 1).over(w)
, it will be fine too, I'll get None for the first line of a user (as there is no previous line), and then the accumulated sum.
The issue comes when I try to substract my two integers. But I don't understand why ? It's two integers, the result should be an integer too, shouldn't it ?
Thanks for any help you can offer me.
If would be strange if it was a difference between two integers, but is not. Let's look at the culprit once again:
coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
The left-hand operand for subtraction is a str
. str
has not __sub__
that can operate on a Column
, so we use __rsub__
of the right-hand operand. In general dunder methods of Column
interpret standard Python types as literals. So your code actually tries to subtract and integer from a string "timestamp" and result is undefined.
TL;DR You should use Column
as the left-hand operand:
from pyspark.sql.functions import col
coalesce(abs(col("timestamp") - lag("timestamp", 1).over(w)), lit(0))