Search code examples
scalaapache-spark

Scala spark - flatmap or alternative function


Im new to spark coming from apache flink and looking for some advice on some transformation requirement. In flink i would manage this with a stateful flatmap, but not sure if the same applies in spark, or there is a better alternative.

Im reading from one delta table and need to write the output to another

Given the below table format

Time Name Value
"2023-11-01T12:51" ID 75B
"2023-11-01T12:52" Pressure 5
"2023-11-01T12:56" Resistance 20
"2023-11-01T12:57" ID 55C
"2023-11-01T12:57" Pressure 10

I require the output as

Time Name Value ID
"2023-11-01T12:52" Pressure 5 75B
"2023-11-01T12:56" Resistance 20 75B
"2023-11-01T12:57" Pressure 10 55C

Essentially aligning every value with the active ID at the time, all of the data is time series.

So my questions are

  1. What is the best method to follow for this alignment. A stateful flatmap or its manageable with simple data frame transformations
  2. If flatmap, a good resource to follow to implement this kind of procedure

Solution

  • Unique identifier can be assigned to each row with row_number function, and previous ID row found by Window with range. Received dataframe joined to itself:

    val df = Seq(
      ("2023-11-01T12:51", "ID", "75B"),
      ("2023-11-01T12:52", "Pressure", "5"),
      ("2023-11-01T12:56", "Resistance", "20"),
      ("2023-11-01T12:57", "ID", "55C"),
      ("2023-11-01T12:57", "Pressure", "10")
    ).toDF("Time", "Name", "Value")
    
    val withCurrentRowWindow = Window.orderBy("Time")
      .rangeBetween(Window.unboundedPreceding, Window.currentRow)
    
    val withIdRowDetected = df
      .withColumn("row_number", row_number().over(Window.orderBy("Time")))
    
      .withColumn("id_row_number", max(
        when($"Name" === lit("ID"), $"row_number").otherwise(null)
      ).over(withCurrentRowWindow))
    
    withIdRowDetected.show(false)
    
    val result = withIdRowDetected
      .alias("non_ids")
      .where($"Name" =!= lit("ID"))
      .join(withIdRowDetected
        .alias("ids")
        .where($"Name" === lit("ID")),
        $"non_ids.id_row_number" === $"ids.row_number", "inner")
      .select($"non_ids.Time", $"non_ids.Name", $"non_ids.Value", $"ids.Value".alias("ID"))
    

    Output:

    +----------------+----------+-----+----------+-------------+
    |Time            |Name      |Value|row_number|id_row_number|
    +----------------+----------+-----+----------+-------------+
    |2023-11-01T12:51|ID        |75B  |1         |1            |
    |2023-11-01T12:52|Pressure  |5    |2         |1            |
    |2023-11-01T12:56|Resistance|20   |3         |1            |
    |2023-11-01T12:57|ID        |55C  |4         |4            |
    |2023-11-01T12:57|Pressure  |10   |5         |4            |
    +----------------+----------+-----+----------+-------------+
    
    +----------------+----------+-----+---+
    |Time            |Name      |Value|ID |
    +----------------+----------+-----+---+
    |2023-11-01T12:52|Pressure  |5    |75B|
    |2023-11-01T12:56|Resistance|20   |75B|
    |2023-11-01T12:57|Pressure  |10   |55C|
    +----------------+----------+-----+---+
    

    Note: Window without partitions used, performance can be bad.