Im new to spark coming from apache flink and looking for some advice on some transformation requirement. In flink i would manage this with a stateful flatmap, but not sure if the same applies in spark, or there is a better alternative.
Im reading from one delta table and need to write the output to another
Given the below table format
Time | Name | Value |
---|---|---|
"2023-11-01T12:51" | ID | 75B |
"2023-11-01T12:52" | Pressure | 5 |
"2023-11-01T12:56" | Resistance | 20 |
"2023-11-01T12:57" | ID | 55C |
"2023-11-01T12:57" | Pressure | 10 |
I require the output as
Time | Name | Value | ID |
---|---|---|---|
"2023-11-01T12:52" | Pressure | 5 | 75B |
"2023-11-01T12:56" | Resistance | 20 | 75B |
"2023-11-01T12:57" | Pressure | 10 | 55C |
Essentially aligning every value with the active ID at the time, all of the data is time series.
So my questions are
Unique identifier can be assigned to each row with row_number
function, and previous ID
row found by Window with range. Received dataframe joined to itself:
val df = Seq(
("2023-11-01T12:51", "ID", "75B"),
("2023-11-01T12:52", "Pressure", "5"),
("2023-11-01T12:56", "Resistance", "20"),
("2023-11-01T12:57", "ID", "55C"),
("2023-11-01T12:57", "Pressure", "10")
).toDF("Time", "Name", "Value")
val withCurrentRowWindow = Window.orderBy("Time")
.rangeBetween(Window.unboundedPreceding, Window.currentRow)
val withIdRowDetected = df
.withColumn("row_number", row_number().over(Window.orderBy("Time")))
.withColumn("id_row_number", max(
when($"Name" === lit("ID"), $"row_number").otherwise(null)
).over(withCurrentRowWindow))
withIdRowDetected.show(false)
val result = withIdRowDetected
.alias("non_ids")
.where($"Name" =!= lit("ID"))
.join(withIdRowDetected
.alias("ids")
.where($"Name" === lit("ID")),
$"non_ids.id_row_number" === $"ids.row_number", "inner")
.select($"non_ids.Time", $"non_ids.Name", $"non_ids.Value", $"ids.Value".alias("ID"))
Output:
+----------------+----------+-----+----------+-------------+
|Time |Name |Value|row_number|id_row_number|
+----------------+----------+-----+----------+-------------+
|2023-11-01T12:51|ID |75B |1 |1 |
|2023-11-01T12:52|Pressure |5 |2 |1 |
|2023-11-01T12:56|Resistance|20 |3 |1 |
|2023-11-01T12:57|ID |55C |4 |4 |
|2023-11-01T12:57|Pressure |10 |5 |4 |
+----------------+----------+-----+----------+-------------+
+----------------+----------+-----+---+
|Time |Name |Value|ID |
+----------------+----------+-----+---+
|2023-11-01T12:52|Pressure |5 |75B|
|2023-11-01T12:56|Resistance|20 |75B|
|2023-11-01T12:57|Pressure |10 |55C|
+----------------+----------+-----+---+
Note: Window without partitions used, performance can be bad.