Search code examples
scalaapache-spark-sqlunix-timestamp

Order By Timestamp is not working for Date time column in Scala Spark


Here is my data frame

+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+----------------------------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+------------------------------+------------+
|DataPartition|TimeStamp                |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|AuditorOpinionOnInternalControlsId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnGoingConcernId|tobefiltered|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+----------------------------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+------------------------------+------------+
|Japan        |2018-04-04T09:53:35+00:00|4295877275    |181     |3185     |3023399             |UNQ               |3010546         |3010546                           |true                |false                  |false                  |O|!|       |null                               |null                            |null                          |O|!|        |
|Japan        |2018-04-04T08:36:57+00:00|4295877275    |189     |3185     |3023399             |UNQ               |3010546         |3010546                           |true                |false                  |false                  |O|!|       |null                               |null                            |null                          |O|!|        |
|Japan        |2018-04-04T08:39:19+00:00|4295877275    |173     |3185     |3023399             |UNQ               |3010546         |3010546                           |true                |false                  |false                  |O|!|       |null                               |null                            |null                          |O|!|        |
|Japan        |2018-04-04T08:24:17+00:00|4295877275    |196     |5913     |3026579             |UWE               |3010547         |null                              |true                |false                  |false                  |I|!|       |null                               |null                            |null                          |I|!|        |
|Japan        |2018-04-04T08:24:17+00:00|4295877275    |196     |3185     |3023399             |UNQ               |3010546         |3010546                           |true                |false                  |false                  |I|!|       |null                               |null                            |null                          |I|!|        |
|Japan        |2018-04-04T09:53:35+00:00|4295877275    |196     |null     |null                |null              |null            |null                              |null                |null                   |null                   |D|!|       |null                               |null                            |null                          |I|!|        |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+----------------------------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+------------------------------+------------+

This is what I am doing in order to get the latest based on two columns:

val windowSpec3 = Window.partitionBy("OrganizationID", "SourceID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
    val latestForEachKey3 = latestForEachKey.withColumn("rank", row_number.over(windowSpec3)).filter($"rank" === 1).drop("rank").drop("tobefiltered", "TimeStamp")
    latestForEachKey3.show(false)

Which gives me below output

+-------------+--------------+--------+---------+--------------------+------------------+----------------+----------------------------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+------------------------------+
|DataPartition|OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|AuditorOpinionOnInternalControlsId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnGoingConcernId|
+-------------+--------------+--------+---------+--------------------+------------------+----------------+----------------------------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+------------------------------+
|Japan        |4295877275    |181     |3185     |3023399             |UNQ               |3010546         |3010546                           |true                |false                  |false                  |O|!|       |null                               |null                            |null                          |
|Japan        |4295877275    |189     |3185     |3023399             |UNQ               |3010546         |3010546                           |true                |false                  |false                  |O|!|       |null                               |null                            |null                          |
|Japan        |4295877275    |173     |3185     |3023399             |UNQ               |3010546         |3010546                           |true                |false                  |false                  |O|!|       |null                               |null                            |null                          |
|Japan        |4295877275    |196     |5913     |3026579             |UWE               |3010547         |null                              |true                |false                  |false                  |I|!|       |null                               |null                            |null                          |
+-------------+--------------+--------+---------+--------------------+------------------+----------------+----------------------------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+------------------------------+

So based on the login I should get the row with the below timestamp out of the three same rows.

2018-04-04T09:53:35+00:00|4295877275    |196     |null     |null                

Issue is, I am getting Rank also but .orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)is not working properly .

I tried to use this data format also but with the same result YYYY-MM-DDThh:mm:ssTZD


Solution

  • The timestamp format used is wrong

    instead of

    "yyyy-MM-dd HH:mm:ss.SSS"
    

    use

    "yyyy-MM-dd'T'HH:mm:ss"