Search code examples
apache-sparktimestampmilliseconds

Spark 2.3 timestamp subtract milliseconds


I am using Spark 2.3 and I have read here that it does not support timestamp milliseconds (only in 2.4+), but am looking for ideas on how to do what I need to do.

The data I am processing stores dates as String datatype in Parquet files in this format: 2021-07-09T01:41:58Z

I need to subtract one millisecond from that. If it were Spark 2.4, I think I could do something like this:

to_timestamp(col("sourceStartTimestamp")) - expr("INTERVAL 0.001 SECONDS")

But since it is Spark 2.3, that does not do anything. I confirmed it can subtract 1 second, but it ignores any value less than a second.

Can anyone suggestion a workaround for how to do this in Spark 2.3? Ultimately, the result will need to be a String data type if that makes any difference.


Solution

  • Since millisecond-timestamp isn't supported by Spark 2.3 (or below), consider using a UDF that takes a delta millis and a date format to get what you need using java.time's plusNanos():

    def getMillisTS(delta: Long, fmt: String = "yyyy-MM-dd HH:mm:ss.SSS") = udf{
      (ts: java.sql.Timestamp) =>
        import java.time.format.DateTimeFormatter
        ts.toLocalDateTime.plusNanos(delta * 1000000).format(DateTimeFormatter.ofPattern(fmt))
    }
    

    Test-running the UDF:

    val df = Seq("2021-01-01 00:00:00", "2021-02-15 12:30:00").toDF("ts")
    
    df.withColumn("millisTS", getMillisTS(-1)($"ts")).show(false)
    /*
    +-------------------+-----------------------+
    |ts                 |millisTS               |
    +-------------------+-----------------------+
    |2021-01-01 00:00:00|2020-12-31 23:59:59.999|
    |2021-02-15 12:30:00|2021-02-15 12:29:59.999|
    +-------------------+-----------------------+
    */
    
    df.withColumn("millisTS", getMillisTS(5000)($"ts")).show(false)
    /*
    +-------------------+-----------------------+
    |ts                 |millisTS               |
    +-------------------+-----------------------+
    |2021-01-01 00:00:00|2021-01-01 00:00:05.000|
    |2021-02-15 12:30:00|2021-02-15 12:30:05.000|
    +-------------------+-----------------------+
    */
    
    val df = Seq("2021-01-01T00:00:00Z", "2021-02-15T12:30:00Z").toDF("ts")
    
    df.withColumn(
        "millisTS",
        getMillisTS(-1, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")(to_timestamp($"ts", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
      ).show(false)
    /*
    +-------------------+------------------------+
    |ts                 |millisTS                |
    +-------------------+------------------------+
    |2021-01-01 00:00:00|2020-12-31T23:59:59.999Z|
    |2021-02-15 12:30:00|2021-02-15T12:29:59.999Z|
    +-------------------+------------------------+
    */