Please help me with this pyspark code. I need to count the number of times an ip appeared in the last 24 hours excluding that instance. The first time an ip appears in the data the count_last24hrs column should return value as 0. From next time onwards, the code should count the number of times the same ip has appeared in the last 24 hours from that timestamp excluding that instance.
I was trying to use window function but was not getting the desired result.
count_last24hrs is the column name in which the result should appear.
Using this data frame as df
column names as (datetime, ip, count_last24hrs)
(10/05/2022 10:14:00 AM, 1.1.1.1, 0)
(10/05/2022 10:16:00 AM, 1.1.1.1, 1)
(10/05/2022 10:18:00 AM, 2.2.2.2, 0)
(10/05/2022 10:21:00 AM, 1.1.1.1, 2)
Code that I was trying
#function to calculate the number of seconds from the number of days
days = lambda i: i * 86400
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("datetime").cast('long')).rangeBetween(-days(1), 0))
#use collect_set and size functions to perform countDistinct over a window
df_new= df.withColumn('count_last24hrs', F.size(F.collect_set("ip”).over(w)))
result = (df
.withColumn('ip_count', F.expr("count(ip_address) over (partition by ip_address order by datetimecol range between interval 24 hours preceding and current row)"))
.withColumn('ip_count',when(f.col('ip')==0,0).otherwise(f.col('ip')-1) ).
select('datetimecol', 'ip_address','ip_count')
The first withColumn statement selects data in last 24 hours and partition the data by "ip_address" ordered by time and finds cumumulative sum
The Second withColumn makes the count decrement by 1. So that the first count is 0 instead of 1.
Result:
datetimecol | ip | ip_last24_hrs |
---|---|---|
2022-05-10 10:14:00 | 1.1.1.1 | 0 |
2022-05-10 10:16:00 | 1.1.1.1 | 1 |
2022-05-10 10:18:00 | 2.2.2.2 | 0 |
2022-05-10 10:21:00 | 1.1.1.1 | 2 |