Search code examples
apache-sparkdataframepysparkapache-spark-sqlwindow-functions

Difference between two integers returns None with PySpark?


I am trying to integrate a "user session" defined by a rolling time window, in my spark data.

I have been using this issue for that : How to aggregate over rolling time window with groups in Spark

The difference with me is I'd like my time window to be about 5 hours, so I can't use datediff that will returns a number of days.

Here is my data set :

[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123),
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123),
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123),
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456),
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456),
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456),
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789),
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789),
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789),
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)]

All I need is to add a column that will index the session, by user. (For example, auction_id 9999 is session 0, and auction_id 8888 and auction_id 5555 are session 1 (because there are many days between 9999 and 8888 and only a few minutes between 8888 and 5555). And we start the indexing at 0 for the next user.

Here is my code :

# Add a timestamp (integer) column
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer'))

# We partition by user and order by timestamp
w = Window.partitionBy("user_id_64").orderBy("timestamp")

# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned. 
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))

# If difference higher than 5 hours
indicator = (diff > 5 * 60 * 60).cast("integer")

# We increment for each indicator = 1
subgroup = sum(test).over(w).alias("session_index")

# We get everything
df = df.select("*", subgroup)

At the end, the session_index is 0 for everyone. The issue comes from the line diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0)). Here, it is lit(0) that is returned each time (I checked by changing the 0 value). So I try simplifying my script by changing a few lines :

test = "timestamp" - lag("timestamp", 1).over(w)
subgroup = sum(test).over(w).alias("session_index")

I removed the coalesce and the abs functions. session_index is "None" for each line.

If I replace test by test = "timestamp", this will be fine : I'll get the accumulated sum of the timestamps.

If I replace it by test = lag("timestamp", 1).over(w), it will be fine too, I'll get None for the first line of a user (as there is no previous line), and then the accumulated sum.

The issue comes when I try to substract my two integers. But I don't understand why ? It's two integers, the result should be an integer too, shouldn't it ?

Thanks for any help you can offer me.


Solution

  • If would be strange if it was a difference between two integers, but is not. Let's look at the culprit once again:

    coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
    

    The left-hand operand for subtraction is a str. str has not __sub__ that can operate on a Column, so we use __rsub__ of the right-hand operand. In general dunder methods of Column interpret standard Python types as literals. So your code actually tries to subtract and integer from a string "timestamp" and result is undefined.

    TL;DR You should use Column as the left-hand operand:

    from pyspark.sql.functions import col
    
    coalesce(abs(col("timestamp") - lag("timestamp", 1).over(w)), lit(0))