I have two dataframes like below that contain each trip start and end timestamps
For example, consider a source dataframe where BUS1 departs from CITY1 at 2023-12-17 07:27:00. In a second dataframe, BUS1 arrives at CITY2 at 2023-12-17 08:27:00. then, BUS1 departs from CITY2 at 2023-12-17 08:27:00 and arrives at CITY1 at 2023-12-17 09:50:00...here we need to consider the arrived timestamp as the start timestamp when the next trip starts. This pattern follows for all other trips.
Source Dataframe Destination Dataframe
+----+-------------------+------+ +----+-------------------+-----------+
| ID|START_TIMESTAMP |SOURCE| | ID| END_TIMESTAMP |DESTINATION|
+----+-------------------+------+ +----+-------------------+-----------+
|BUS1|2023-12-17 07:27:00| CITY1| |BUS1|2023-12-17 08:27:00| CITY2 |
|BUS1|2023-12-17 09:50:00| CITY1| |BUS1|2023-12-17 11:13:00| CITY2 |
|BUS1|2023-12-17 12:23:00| CITY1| |BUS1|2023-12-17 14:50:00| CITY2 |
|BUS1|2023-12-17 16:13:00| CITY1| |BUS2|2023-12-17 07:09:00| CITY2 |
|BUS2|2023-12-17 06:08:00| CITY1| |BUS2|2023-12-17 09:53:00| CITY2 |
|BUS2|2023-12-17 08:31:00| CITY1| |BUS2|2023-12-17 13:31:00| CITY2 |
|BUS2|2023-12-17 11:13:00| CITY1| +----+-------------------+-----------+
|BUS2|2023-12-17 14:44:00| CITY1|
+------------------------+------+
Expected output like below
+----+-------------------+-----------------------+------+
| ID|START_TIMESTAMP | END_TIMESTAMP |stop |
+----+-------------------+-----------------------+------+
|BUS1|2023-12-17 07:27:00| 2023-12-17 08:27:00|CITY2 |
|BUS1|2023-12-17 08:27:00| 2023-12-17 09:50:00|CITY1 |
|BUS1|2023-12-17 09:50:00| 2023-12-17 11:13:00|CITY2 |
|BUS1|2023-12-17 11:13:00| 2023-12-17 12:23:00|CITY1 |
|BUS1|2023-12-17 12:23:00| 2023-12-17 14:50:00|CITY2 |
|BUS1|2023-12-17 14:50:00| 2023-12-17 16:13:00|CITY1 |
|BUS2|2023-12-17 06:08:00| 2023-12-17 07:09:00|CITY2 |
|BUS2|2023-12-17 07:09:00| 2023-12-17 08:31:00|CITY1 |
|BUS2|2023-12-17 08:31:00| 2023-12-17 09:53:00|CITY2 |
|BUS2|2023-12-17 09:53:00| 2023-12-17 11:13:00|CITY1 |
|BUS2|2023-12-17 11:13:00| 2023-12-17 13:31:00|CITY2 |
|BUS2|2023-12-17 13:31:00| 2023-12-17 14:44:00|CITY1 |
+----+-------------------+-----------------------+------+
As you have two very specific dataframes, you don't need to join. You can simply union the two dataframe and then use windows with lead
function to setup end timestamp and stop.
Given your source dataframe :
val sourceDataframe = Seq(
("BUS1", "2023-12-17 07:27:00", "CITY1"),
("BUS1", "2023-12-17 09:50:00", "CITY1"),
("BUS1", "2023-12-17 12:23:00", "CITY1"),
("BUS1", "2023-12-17 16:13:00", "CITY1"),
("BUS2", "2023-12-17 06:08:00", "CITY1"),
("BUS2", "2023-12-17 08:31:00", "CITY1"),
("BUS2", "2023-12-17 11:13:00", "CITY1"),
("BUS2", "2023-12-17 14:44:00", "CITY1")
).toDF("ID", "START_TIMESTAMP", "SOURCE")
And your destination dataframe:
val destinationDataframe = Seq(
("BUS1", "2023-12-17 08:27:00","CITY2"),
("BUS1", "2023-12-17 11:13:00","CITY2"),
("BUS1", "2023-12-17 14:50:00","CITY2"),
("BUS2", "2023-12-17 07:09:00","CITY2"),
("BUS2", "2023-12-17 09:53:00","CITY2"),
("BUS2", "2023-12-17 13:31:00","CITY2")
).toDF("ID", "END_TIMESTAMP", "DESTINATION")
You can obtain your result with the following code:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lead}
val window = Window.partitionBy("ID").orderBy("START_TIMESTAMP")
val result = sourceDataframe.union(destinationDataframe)
.withColumn("END_TIMESTAMP", lead("START_TIMESTAMP", 1).over(window))
.withColumn("stop", lead("SOURCE", 1).over(window))
.drop("SOURCE")
.filter(col("END_TIMESTAMP").isNotNull)
And you get the following result :
+----+-------------------+-------------------+-----+
|ID |START_TIMESTAMP |END_TIMESTAMP |stop |
+----+-------------------+-------------------+-----+
|BUS1|2023-12-17 07:27:00|2023-12-17 08:27:00|CITY2|
|BUS1|2023-12-17 08:27:00|2023-12-17 09:50:00|CITY1|
|BUS1|2023-12-17 09:50:00|2023-12-17 11:13:00|CITY2|
|BUS1|2023-12-17 11:13:00|2023-12-17 12:23:00|CITY1|
|BUS1|2023-12-17 12:23:00|2023-12-17 14:50:00|CITY2|
|BUS1|2023-12-17 14:50:00|2023-12-17 16:13:00|CITY1|
|BUS2|2023-12-17 06:08:00|2023-12-17 07:09:00|CITY2|
|BUS2|2023-12-17 07:09:00|2023-12-17 08:31:00|CITY1|
|BUS2|2023-12-17 08:31:00|2023-12-17 09:53:00|CITY2|
|BUS2|2023-12-17 09:53:00|2023-12-17 11:13:00|CITY1|
|BUS2|2023-12-17 11:13:00|2023-12-17 13:31:00|CITY2|
|BUS2|2023-12-17 13:31:00|2023-12-17 14:44:00|CITY1|
+----+-------------------+-------------------+-----+