Search code examples
dataframescalaapache-sparkapache-spark-sqlaws-glue

How to join two dataframes based on start end end timestamps using spark


I have two dataframes like below that contain each trip start and end timestamps

For example, consider a source dataframe where BUS1 departs from CITY1 at 2023-12-17 07:27:00. In a second dataframe, BUS1 arrives at CITY2 at 2023-12-17 08:27:00. then, BUS1 departs from CITY2 at 2023-12-17 08:27:00 and arrives at CITY1 at 2023-12-17 09:50:00...here we need to consider the arrived timestamp as the start timestamp when the next trip starts. This pattern follows for all other trips.

 Source Dataframe                    Destination Dataframe
+----+-------------------+------+    +----+-------------------+-----------+
|  ID|START_TIMESTAMP    |SOURCE|    |  ID|  END_TIMESTAMP    |DESTINATION|
+----+-------------------+------+    +----+-------------------+-----------+
|BUS1|2023-12-17 07:27:00| CITY1|    |BUS1|2023-12-17 08:27:00| CITY2     |
|BUS1|2023-12-17 09:50:00| CITY1|    |BUS1|2023-12-17 11:13:00| CITY2     |
|BUS1|2023-12-17 12:23:00| CITY1|    |BUS1|2023-12-17 14:50:00| CITY2     |
|BUS1|2023-12-17 16:13:00| CITY1|    |BUS2|2023-12-17 07:09:00| CITY2     |
|BUS2|2023-12-17 06:08:00| CITY1|    |BUS2|2023-12-17 09:53:00| CITY2     |
|BUS2|2023-12-17 08:31:00| CITY1|    |BUS2|2023-12-17 13:31:00| CITY2     |
|BUS2|2023-12-17 11:13:00| CITY1|    +----+-------------------+-----------+
|BUS2|2023-12-17 14:44:00| CITY1|
+------------------------+------+

Expected output like below

+----+-------------------+-----------------------+------+
|  ID|START_TIMESTAMP    |      END_TIMESTAMP    |stop  |
+----+-------------------+-----------------------+------+
|BUS1|2023-12-17 07:27:00|    2023-12-17 08:27:00|CITY2 |
|BUS1|2023-12-17 08:27:00|    2023-12-17 09:50:00|CITY1 |
|BUS1|2023-12-17 09:50:00|    2023-12-17 11:13:00|CITY2 |
|BUS1|2023-12-17 11:13:00|    2023-12-17 12:23:00|CITY1 |
|BUS1|2023-12-17 12:23:00|    2023-12-17 14:50:00|CITY2 |
|BUS1|2023-12-17 14:50:00|    2023-12-17 16:13:00|CITY1 |
|BUS2|2023-12-17 06:08:00|    2023-12-17 07:09:00|CITY2 |
|BUS2|2023-12-17 07:09:00|    2023-12-17 08:31:00|CITY1 |
|BUS2|2023-12-17 08:31:00|    2023-12-17 09:53:00|CITY2 |
|BUS2|2023-12-17 09:53:00|    2023-12-17 11:13:00|CITY1 |
|BUS2|2023-12-17 11:13:00|    2023-12-17 13:31:00|CITY2 |
|BUS2|2023-12-17 13:31:00|    2023-12-17 14:44:00|CITY1 |
+----+-------------------+-----------------------+------+

Solution

  • As you have two very specific dataframes, you don't need to join. You can simply union the two dataframe and then use windows with lead function to setup end timestamp and stop.

    Given your source dataframe :

    val sourceDataframe = Seq(
      ("BUS1", "2023-12-17 07:27:00", "CITY1"),
      ("BUS1", "2023-12-17 09:50:00", "CITY1"),
      ("BUS1", "2023-12-17 12:23:00", "CITY1"),
      ("BUS1", "2023-12-17 16:13:00", "CITY1"),
      ("BUS2", "2023-12-17 06:08:00", "CITY1"),
      ("BUS2", "2023-12-17 08:31:00", "CITY1"),
      ("BUS2", "2023-12-17 11:13:00", "CITY1"),
      ("BUS2", "2023-12-17 14:44:00", "CITY1")
    ).toDF("ID", "START_TIMESTAMP", "SOURCE")
    

    And your destination dataframe:

    val destinationDataframe = Seq(
      ("BUS1", "2023-12-17 08:27:00","CITY2"),
      ("BUS1", "2023-12-17 11:13:00","CITY2"),
      ("BUS1", "2023-12-17 14:50:00","CITY2"),
      ("BUS2", "2023-12-17 07:09:00","CITY2"),
      ("BUS2", "2023-12-17 09:53:00","CITY2"),
      ("BUS2", "2023-12-17 13:31:00","CITY2")
    ).toDF("ID", "END_TIMESTAMP", "DESTINATION")
    

    You can obtain your result with the following code:

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.{col, lead}
    
    val window = Window.partitionBy("ID").orderBy("START_TIMESTAMP")
    
    val result = sourceDataframe.union(destinationDataframe)
      .withColumn("END_TIMESTAMP", lead("START_TIMESTAMP", 1).over(window))
      .withColumn("stop", lead("SOURCE", 1).over(window))
      .drop("SOURCE")
      .filter(col("END_TIMESTAMP").isNotNull)
    

    And you get the following result :

    +----+-------------------+-------------------+-----+
    |ID  |START_TIMESTAMP    |END_TIMESTAMP      |stop |
    +----+-------------------+-------------------+-----+
    |BUS1|2023-12-17 07:27:00|2023-12-17 08:27:00|CITY2|
    |BUS1|2023-12-17 08:27:00|2023-12-17 09:50:00|CITY1|
    |BUS1|2023-12-17 09:50:00|2023-12-17 11:13:00|CITY2|
    |BUS1|2023-12-17 11:13:00|2023-12-17 12:23:00|CITY1|
    |BUS1|2023-12-17 12:23:00|2023-12-17 14:50:00|CITY2|
    |BUS1|2023-12-17 14:50:00|2023-12-17 16:13:00|CITY1|
    |BUS2|2023-12-17 06:08:00|2023-12-17 07:09:00|CITY2|
    |BUS2|2023-12-17 07:09:00|2023-12-17 08:31:00|CITY1|
    |BUS2|2023-12-17 08:31:00|2023-12-17 09:53:00|CITY2|
    |BUS2|2023-12-17 09:53:00|2023-12-17 11:13:00|CITY1|
    |BUS2|2023-12-17 11:13:00|2023-12-17 13:31:00|CITY2|
    |BUS2|2023-12-17 13:31:00|2023-12-17 14:44:00|CITY1|
    +----+-------------------+-------------------+-----+