Let's say I have the following pandas dataframe with a non-standard timestamp column without datetime format. Due to I need to include a new column and convert it into an 24hourly-based timestamp for time-series visualizing matter by:
df['timestamp(24hrs)'] = round(df['timestamp(sec)']/24*3600)
and get this:
+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0 |U100|435 |
|1.0 |U100|1091 |
|2.0 |U100|992 |
|3.0 |U100|980 |
|4.0 |U100|288 |
|8.0 |U100|260 |
|9.0 |U100|879 |
|10.0 |U100|875 |
|11.0 |U100|911 |
|13.0 |U100|628 |
|14.0 |U100|642 |
|16.0 |U100|631 |
|17.0 |U100|233 |
... ... ...
|267.0 |U100|1056 |
|269.0 |U100|878 |
|270.0 |U100|256 |
+----------------+----+-----+
Now I noticed that some records' timestamps are missing, and I need to impute those missing data:
timestamp(24hrs)
in continuous ordercount
value by 0
Expected output:
+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0 |U100|435 |
|1.0 |U100|1091 |
|2.0 |U100|992 |
|3.0 |U100|980 |
|4.0 |U100|288 |
|5.0 |U100|0 |
|6.0 |U100|0 |
|7.0 |U100|0 |
|8.0 |U100|260 |
|9.0 |U100|879 |
|10.0 |U100|875 |
|11.0 |U100|911 |
|12.0 |U100|0 |
|13.0 |U100|628 |
|14.0 |U100|642 |
|15.0 |U100|0 |
|16.0 |U100|631 |
|17.0 |U100|233 |
... ... ...
|267.0 |U100|1056 |
|268.0 |U100|0 |
|269.0 |U100|878 |
|270.0 |U100|256 |
+----------------+----+-----+
Any idea how can I do this? Based on this answer over standard timestamp, I can imagine I need to create a new column timestamp(24hrs)
from the start and end of the previous one and do left join()
& crossJoin()
but I couldn't manage it yet.
I've tried the following unsuccessfully:
import pyspark.sql.functions as F
all_dates_df = df.selectExpr(
"sequence(min(timestamp(24hrs)), max(timestamp(24hrs)), interval 1 hour) as hour"
).select(F.explode("timestamp(24hrs)").alias("timestamp(24hrs)"))
all_dates_df.show()
result_df = all_dates_df.crossJoin(
df.select("UserName").distinct()
).join(
df,
["count", "timestamp(24hrs)"],
"left"
).fillna(0)
result_df.show()
sequence
function is available for integer. It doesn't work for double type so it requires to cast to integer then cast back to double (if you want to retain as double).
df_seq = (df.withColumn('time_int', F.col('timestamp(24hrs)').cast(IntegerType()))
.select(F.explode(F.sequence(F.min('time_int'), F.max('time_int'))).alias('timestamp(24hrs)'))
.select(F.col('timestamp(24hrs)').cast(DoubleType())))
df = (df_seq.crossJoin(df.select("User").distinct())
.join(df, on=['User', 'timestamp(24hrs)'], how='left')
.fillna(0))