Search code examples
pythonpysparktimestampunix-timestamp

Round timestamp to nearest 30 second


I have a column in a DF and it contains timestamp in format ( yyyy-mm-dd HH:mm:ss). I need to round timestamp to nearest 30 seconds.

old column                   desired column
2016-02-09 19:31:02          2016-02-09 19:31:00  
2016-02-09 19:31:35          2016-02-09 19:31:30
2016-02-09 19:31:52          2016-02-09 19:32:00
2016-02-09 19:31:28          2016-02-09 19:31:30

Is it possible to do that in Pyspark ?


Solution

  • If you're using spark verson 1.5+, you can use pyspark.sql.functions.second() to get the seconds from your timestamp column.

    import pyspark.sql.functions as f
    df.withColumn("second", f.second("old_timestamp")).show()
    #+-------------------+------+
    #|      old_timestamp|second|
    #+-------------------+------+
    #|2016-02-09 19:31:02|     2|
    #|2016-02-09 19:31:35|    35|
    #|2016-02-09 19:31:52|    52|
    #|2016-02-09 19:31:28|    28|
    #+-------------------+------+
    

    Once you have the seconds part you can take this number, divide by 30, round it, and multiply by 30 to get the "new" second.

    df.withColumn("second", f.second("old_timestamp"))\
        .withColumn("new_second", f.round(f.col("second")/30)*30)\
        .show()
    #+-------------------+------+----------+
    #|      old_timestamp|second|new_second|
    #+-------------------+------+----------+
    #|2016-02-09 19:31:02|     2|       0.0|
    #|2016-02-09 19:31:35|    35|      30.0|
    #|2016-02-09 19:31:52|    52|      60.0|
    #|2016-02-09 19:31:28|    28|      30.0|
    #+-------------------+------+----------+
    

    From the "new" second, we can compute an offset in seconds, which when added to the original timestamp will produce the desired "rounded" timestamps.

    df.withColumn("second", f.second("old_timestamp"))\
        .withColumn("new_second", f.round(f.col("second")/30)*30)\
        .withColumn("add_seconds", f.col("new_second") - f.col("second"))\
        .show()
    #+-------------------+------+----------+-----------+
    #|      old_timestamp|second|new_second|add_seconds|
    #+-------------------+------+----------+-----------+
    #|2016-02-09 19:31:02|     2|       0.0|       -2.0|
    #|2016-02-09 19:31:35|    35|      30.0|       -5.0|
    #|2016-02-09 19:31:52|    52|      60.0|        8.0|
    #|2016-02-09 19:31:28|    28|      30.0|        2.0|
    #+-------------------+------+----------+-----------+
    

    As we can see, a negative number in this column means that the original time has to be rounded down. A positive number will increase the time.

    In order to add this time to the original timestamp, first convert it to a unix timestamp using pyspark.sql.functions.unix_timestamp(). After the addition, convert the result back to a timestamp using pyspark.sql.functions.from_unixtime().

    Putting this all together (condensing the intermediate steps):

    df.withColumn(
            "add_seconds",
            (f.round(f.second("old_timestamp")/30)*30) - f.second("old_timestamp")
        )\
        .withColumn(
            "new_timestamp",
            f.from_unixtime(f.unix_timestamp("old_timestamp") + f.col("add_seconds"))
        )\
        .drop("add_seconds")\
        .show()
    #+-------------------+-------------------+
    #|      old_timestamp|      new_timestamp|
    #+-------------------+-------------------+
    #|2016-02-09 19:31:02|2016-02-09 19:31:00|
    #|2016-02-09 19:31:35|2016-02-09 19:31:30|
    #|2016-02-09 19:31:52|2016-02-09 19:32:00|
    #|2016-02-09 19:31:28|2016-02-09 19:31:30|
    #+-------------------+-------------------+