Need to apply transformation such that first value of created_at field of a partition group should be added in the new column startDate for the entire partition group.
And secondly whenever tg changes for the same value of "type"and "key" column created_at field of the new column should become the endDate for the row above it with same "type"and "key" else it remains null.
type key tg created_at timestamp row_number
device_id essentials template 1600269347 2020-09-21 19:08:05 1
device_id experiment t1 1599721314 2020-09-17 01:37:17 1
device_id experiment v1 1600228007 2020-09-21 18:07:53 2
device_id experiment c1 1605221085 2020-09-21 18:07:53 3
test t_key t1 1599714939 2020-09-16 01:37:55 1
test t_key t2 1600084857 2020-09-21 17:08:23 2
Applied steps till now-: val windowSpec = Window.partitionBy("type","key").orderBy("timestamp") test.withColumn("row_number",row_number.over(windowSpec)).show()
Expected Output-:
type key tg created_at timestamp row_number startDate endDate
device_id essentials template 1600269347 2020-09-21 19:08:05 1 1600269347 null
device_id experiment t1 1599721314 2020-09-17 01:37:17 1 1599721314 1600228007
device_id experiment v1 1600228007 2020-09-21 18:07:53 2 1599721314 1605221085
device_id experiment c1 1605221085 2020-09-21 18:07:53 3 1599721314 null
test t_key t1 1599714939 2020-09-16 01:37:55 1 1599714939 1600084857
test t_key t2 1600084857 2020-09-21 17:08:23 2 1599714939 null
Any suggestions, on how to proceed?
You can use first
over your window to get the first value of created_at. min
would also work in this case.
The second one is a little trickier. You need to use lag
and keep in mind that that the result of lag over a window for the last row in the window will always be null.
val schema = List(
StructField("type", StringType, true),
StructField("key", StringType, true),
StructField("tg", StringType, true),
StructField("created_at", IntegerType, true),
StructField("timestamp", TimestampType, true),
StructField("row_number", IntegerType, true)
)
val data = Seq(
Row("device_id", "essentials", "template", 1600269347, Timestamp.valueOf("2020-09-21 19:08:05"), 1),
Row("device_id", "experiment", "t1", 1599721314, Timestamp.valueOf("2020-09-17 01:37:17"), 1),
Row("device_id", "experiment", "v1", 1600228007, Timestamp.valueOf("2020-09-21 18:07:53"), 2),
Row("device_id", "experiment", "c1", 1605221085, Timestamp.valueOf("2020-09-21 18:07:53"), 3),
Row("test", "t_key", "t1", 1599714939, Timestamp.valueOf("2020-09-16 01:37:55"), 1),
Row("test", "t_key", "t2", 1600084857, Timestamp.valueOf("2020-09-21 17:08:23"), 2)
)
val test = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
val windowSpec = Window.partitionBy("type","key").orderBy("timestamp")
test
.withColumn("startDate", first(col("created_at")).over(windowSpec))
.withColumn("endDate", when(
lead(col("tg"), 1).over(windowSpec).isNotNull &&
lead(col("tg"), 1).over(windowSpec) =!= col("tg"),
lead(col("created_at"), 1).over(windowSpec)
).otherwise(lit(null).cast(IntegerType)))
.show()
+---------+----------+--------+----------+-------------------+----------+----------+----------+
| type| key| tg|created_at| timestamp|row_number| startDate| endDate|
+---------+----------+--------+----------+-------------------+----------+----------+----------+
|device_id|essentials|template|1600269347|2020-09-21 19:08:05| 1|1600269347| null|
|device_id|experiment| t1|1599721314|2020-09-17 01:37:17| 1|1599721314|1600228007|
|device_id|experiment| v1|1600228007|2020-09-21 18:07:53| 2|1599721314|1605221085|
|device_id|experiment| c1|1605221085|2020-09-21 18:07:53| 3|1599721314| null|
| test| t_key| t1|1599714939|2020-09-16 01:37:55| 1|1599714939|1600084857|
| test| t_key| t2|1600084857|2020-09-21 17:08:23| 2|1599714939| null|
+---------+----------+--------+----------+-------------------+----------+----------+----------+