Search code examples
scaladataframeapache-sparktransformation

How to apply tranformation on row_number() column


Need to apply transformation such that first value of created_at field of a partition group should be added in the new column startDate for the entire partition group.

And secondly whenever tg changes for the same value of "type"and "key" column created_at field of the new column should become the endDate for the row above it with same "type"and "key" else it remains null.

type             key         tg      created_at       timestamp       row_number

device_id    essentials    template   1600269347   2020-09-21 19:08:05      1                           
device_id    experiment      t1       1599721314   2020-09-17 01:37:17      1                                                    
device_id    experiment      v1       1600228007   2020-09-21 18:07:53      2
device_id    experiment      c1       1605221085   2020-09-21 18:07:53      3
test         t_key           t1       1599714939   2020-09-16 01:37:55      1
test         t_key           t2       1600084857   2020-09-21 17:08:23      2

Applied steps till now-: val windowSpec = Window.partitionBy("type","key").orderBy("timestamp") test.withColumn("row_number",row_number.over(windowSpec)).show()

Expected Output-:

type        key         tg      created_at       timestamp     row_number startDate  endDate

device_id  essentials template 1600269347   2020-09-21 19:08:05  1        1600269347  null                
device_id  experiment   t1     1599721314   2020-09-17 01:37:17  1        1599721314  1600228007                                      
device_id  experiment   v1     1600228007   2020-09-21 18:07:53  2        1599721314  1605221085
device_id  experiment   c1     1605221085   2020-09-21 18:07:53  3        1599721314  null
test       t_key        t1     1599714939   2020-09-16 01:37:55  1        1599714939  1600084857
test       t_key        t2     1600084857   2020-09-21 17:08:23  2        1599714939  null

Any suggestions, on how to proceed?


Solution

  • You can use first over your window to get the first value of created_at. min would also work in this case.

    The second one is a little trickier. You need to use lag and keep in mind that that the result of lag over a window for the last row in the window will always be null.

    val schema =  List(
      StructField("type", StringType, true),
      StructField("key", StringType, true),
      StructField("tg", StringType, true),
      StructField("created_at", IntegerType, true),
      StructField("timestamp", TimestampType, true),
      StructField("row_number", IntegerType, true)
    )
    
    val data =  Seq(
        Row("device_id", "essentials", "template", 1600269347, Timestamp.valueOf("2020-09-21 19:08:05"), 1),
        Row("device_id", "experiment", "t1", 1599721314, Timestamp.valueOf("2020-09-17 01:37:17"), 1),  
        Row("device_id", "experiment", "v1", 1600228007, Timestamp.valueOf("2020-09-21 18:07:53"), 2),
        Row("device_id", "experiment", "c1", 1605221085, Timestamp.valueOf("2020-09-21 18:07:53"), 3),
        Row("test", "t_key", "t1", 1599714939, Timestamp.valueOf("2020-09-16 01:37:55"), 1),
        Row("test", "t_key", "t2", 1600084857, Timestamp.valueOf("2020-09-21 17:08:23"), 2)
      )
    
    val test = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
    
    val windowSpec = Window.partitionBy("type","key").orderBy("timestamp")
    
    test
    .withColumn("startDate", first(col("created_at")).over(windowSpec))
    .withColumn("endDate", when(
      lead(col("tg"), 1).over(windowSpec).isNotNull && 
      lead(col("tg"), 1).over(windowSpec) =!= col("tg"), 
      lead(col("created_at"), 1).over(windowSpec)
    ).otherwise(lit(null).cast(IntegerType)))
    .show()
    
    +---------+----------+--------+----------+-------------------+----------+----------+----------+
    |     type|       key|      tg|created_at|          timestamp|row_number| startDate|   endDate|
    +---------+----------+--------+----------+-------------------+----------+----------+----------+
    |device_id|essentials|template|1600269347|2020-09-21 19:08:05|         1|1600269347|      null|
    |device_id|experiment|      t1|1599721314|2020-09-17 01:37:17|         1|1599721314|1600228007|
    |device_id|experiment|      v1|1600228007|2020-09-21 18:07:53|         2|1599721314|1605221085|
    |device_id|experiment|      c1|1605221085|2020-09-21 18:07:53|         3|1599721314|      null|
    |     test|     t_key|      t1|1599714939|2020-09-16 01:37:55|         1|1599714939|1600084857|
    |     test|     t_key|      t2|1600084857|2020-09-21 17:08:23|         2|1599714939|      null|
    +---------+----------+--------+----------+-------------------+----------+----------+----------+