Search code examples
pysparktimestampapache-spark-sqlwindowing

Spark: counting on a window not working for millisecond


You can create a window to count the number of times a record has occurred in the last 7 days. However, if you try to look at the number of times the record has occurred on a millisecond level, it breaks down.

In short, the below function df.timestamp.astype('Timestamp').cast("long") only converts the timestamp up to the grain of a second to a long. It ignores the millisecond. How do you turn the entire timestamp, milliseconds included, into a long. You need the value to be a long so that it'll work with the window.

from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp

df = sqlContext.createDataFrame([
        ("a", "u8u", "2018-02-02 05:46:41.438357"),
        ("a", "u8u", "2018-02-02 05:46:41.439377"),
        ("a", "a3a", "2018-02-02 09:48:34.081818"),
        ("a", "a3a", "2018-02-02 09:48:34.095586"),
        ("a", "g8g", "2018-02-02 09:48:56.006206"),
        ("a", "g8g", "2018-02-02 09:48:56.007974"),
        ("a", "9k9", "2018-02-02 12:50:48.000000"),
        ("a", "9k9", "2018-02-02 12:50:48.100000"),
], ["person_id", "session_id", "timestamp"])


df = df.withColumn('unix_ts',df.timestamp.astype('Timestamp').cast("long"))
df = df.withColumn("DayOfWeek",F.date_format(df.timestamp, 'EEEE'))

w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
df = df.withColumn('count',F.count('unix_ts').over(w))
df.sort(df.unix_ts).show(20,False)


+---------+----------+--------------------------+----------+---------+-----+
|person_id|session_id|timestamp                 |unix_ts   |DayOfWeek|count|
+---------+----------+--------------------------+----------+---------+-----+
|a        |u8u       |2018-02-02 05:46:41.438357|1517572001|Friday   |0    |
|a        |u8u       |2018-02-02 05:46:41.439377|1517572001|Friday   |0    |
|a        |a3a       |2018-02-02 09:48:34.081818|1517586514|Friday   |2    |
|a        |a3a       |2018-02-02 09:48:34.095586|1517586514|Friday   |2    |
|a        |g8g       |2018-02-02 09:48:56.006206|1517586536|Friday   |4    |
|a        |g8g       |2018-02-02 09:48:56.007974|1517586536|Friday   |4    |
|a        |9k9       |2018-02-02 12:50:48.000000|1517597448|Friday   |6    |
|a        |9k9       |2018-02-02 12:50:48.100000|1517597448|Friday   |6    |
+---------+----------+--------------------------+----------+---------+-----+

The count should be 0,1,2,3,4,5... instead of 0,0,2,2,4,4,...


Solution

  • You can use pyspark.sql.functions.unix_timestamp() to convert the string column to a timestamp instead of casting to long.

    import pyspark.sql.functions as F
    df.select(
        "timestamp",
        F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS").alias("unix_ts")
    ).show(truncate=False)
    #+--------------------------+----------+
    #|timestamp                 |unix_ts   |
    #+--------------------------+----------+
    #|2018-02-02 05:46:41.438357|1517568839|
    #|2018-02-02 05:46:41.439377|1517568840|
    #|2018-02-02 09:48:34.081818|1517582995|
    #|2018-02-02 09:48:34.095586|1517583009|
    #|2018-02-02 09:48:56.006206|1517582942|
    #|2018-02-02 09:48:56.007974|1517582943|
    #|2018-02-02 12:50:48.862644|1517594710|
    #|2018-02-02 12:50:49.981848|1517594830|
    #+--------------------------+----------+
    

    The second argument to unix_timestamp() is the format string. In your case, use "yyyy-MM-dd HH:mm:ss.SSSSSS".


    The corresponding change applied to your code would be:

    df = df.withColumn(
        'unix_ts',
        F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS")
    )
    df = df.withColumn("DayOfWeek", F.date_format(df.timestamp, 'EEEE'))
    
    w = Window.partitionBy('person_id','DayOfWeek').orderBy('unix_ts').rangeBetween(-86400*7,-1)
    df = df.withColumn('count',F.count('unix_ts').over(w))
    df.sort(df.unix_ts).show(20,False)
    #+---------+----------+--------------------------+----------+---------+-----+
    #|person_id|session_id|timestamp                 |unix_ts   |DayOfWeek|count|
    #+---------+----------+--------------------------+----------+---------+-----+
    #|a        |u8u       |2018-02-02 05:46:41.438357|1517568839|Friday   |0    |
    #|a        |u8u       |2018-02-02 05:46:41.439377|1517568840|Friday   |1    |
    #|a        |g8g       |2018-02-02 09:48:56.006206|1517582942|Friday   |2    |
    #|a        |g8g       |2018-02-02 09:48:56.007974|1517582943|Friday   |3    |
    #|a        |a3a       |2018-02-02 09:48:34.081818|1517582995|Friday   |4    |
    #|a        |a3a       |2018-02-02 09:48:34.095586|1517583009|Friday   |5    |
    #|a        |9k9       |2018-02-02 12:50:48.862644|1517594710|Friday   |6    |
    #|a        |9k9       |2018-02-02 12:50:49.981848|1517594830|Friday   |7    |
    #+---------+----------+--------------------------+----------+---------+-----+