Search code examples
apache-sparkapache-spark-mllib

column increased by another column by spark


the data source is:

   col1
------
    false
    false
    true
    false
    false
    true
    true
    false

I add a new column,if value of col1 is true,value col2 increased by 1. I expect:

col1,col2
--

    false,0
    false,0
    true,1
    false,1
    false,1
    true,2
    true,3
    false,3

how to add this?


Solution

  • Window function can be used:

    val df = Seq(false, false, true, false, false, true, true, false).toDF("col1")
    val ordered = df
      .withColumn("id", monotonically_increasing_id())
      .withColumn("increment", when($"col1" === true, 1).otherwise(0))
    
    val idWindow = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    val result = ordered.select($"col1", sum($"increment").over(idWindow).alias("col2"))
    

    Output:

    +-----+----+
    |col1 |col2|
    +-----+----+
    |false|0   |
    |false|0   |
    |true |1   |
    |false|1   |
    |false|1   |
    |true |2   |
    |true |3   |
    |false|3   |
    +-----+----+