Search code examples
pysparkapache-spark-sqltime-seriesmissing-datapyspark-pandas

What is the best practice to handle non-datetime timestamp column within pandas dataframe?


Let's say I have the following dataframe with a non-standard timestamp column without datetime format. Due to I need to include a new column and convert it into an 24hourly-based timestamp for time-series visualizing matter by:

df['timestamp(24hrs)'] = round(df['timestamp(sec)']/24*3600)

and get this:

+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0             |U100|435  |
|1.0             |U100|1091 |
|2.0             |U100|992  |
|3.0             |U100|980  |
|4.0             |U100|288  |
|8.0             |U100|260  |
|9.0             |U100|879  |
|10.0            |U100|875  |
|11.0            |U100|911  |
|13.0            |U100|628  |
|14.0            |U100|642  |
|16.0            |U100|631  |
|17.0            |U100|233  |
 ...               ...  ...
|267.0           |U100|1056 |
|269.0           |U100|878  |
|270.0           |U100|256  |
+----------------+----+-----+

Now I noticed that some records' timestamps are missing, and I need to impute those missing data:

  • timestamp(24hrs) in continuous order
  • count value by 0

Expected output:

+----------------+----+-----+
|timestamp(24hrs)|User|count|
+----------------+----+-----+
|0.0             |U100|435  |
|1.0             |U100|1091 |
|2.0             |U100|992  |
|3.0             |U100|980  |
|4.0             |U100|288  |
|5.0             |U100|0    |
|6.0             |U100|0    |
|7.0             |U100|0    |
|8.0             |U100|260  |
|9.0             |U100|879  |
|10.0            |U100|875  |
|11.0            |U100|911  |
|12.0            |U100|0    |
|13.0            |U100|628  |
|14.0            |U100|642  |
|15.0            |U100|0    |
|16.0            |U100|631  |
|17.0            |U100|233  |
 ...               ...  ...
|267.0           |U100|1056 |
|268.0           |U100|0    |
|269.0           |U100|878  |
|270.0           |U100|256  |
+----------------+----+-----+

Any idea how can I do this? Based on this answer over standard timestamp, I can imagine I need to create a new column timestamp(24hrs) from the start and end of the previous one and do left join() & crossJoin() but I couldn't manage it yet.

I've tried the following unsuccessfully:

import pyspark.sql.functions as F

all_dates_df = df.selectExpr(
    "sequence(min(timestamp(24hrs)), max(timestamp(24hrs)), interval 1 hour) as hour"
).select(F.explode("timestamp(24hrs)").alias("timestamp(24hrs)"))

all_dates_df.show()

result_df = all_dates_df.crossJoin(
    df.select("UserName").distinct()
).join(
    df, 
    ["count", "timestamp(24hrs)"],
    "left"
).fillna(0)

result_df.show()

Solution

  • sequence function is available for integer. It doesn't work for double type so it requires to cast to integer then cast back to double (if you want to retain as double).

    df_seq = (df.withColumn('time_int', F.col('timestamp(24hrs)').cast(IntegerType()))
              .select(F.explode(F.sequence(F.min('time_int'), F.max('time_int'))).alias('timestamp(24hrs)'))
              .select(F.col('timestamp(24hrs)').cast(DoubleType())))
    
    df = (df_seq.crossJoin(df.select("User").distinct())
          .join(df, on=['User', 'timestamp(24hrs)'], how='left')
          .fillna(0))