I would like to assign a RunID to a person performing different tasks. A new RunID should be assigned whenever the task changes, and of course this should change per person, too.
This is some example data with the expected RunID already added.
df = spark.createDataFrame(sc.parallelize([
['A', 'T1', '2023-01-01', 1],
['A', 'T1', '2023-01-02', 1],
['A', 'T2', '2023-01-03', 2],
['A', 'T2', '2023-01-04', 2],
['A', 'T2', '2023-01-05', 2],
['A', 'T1', '2023-01-06', 3],
['A', 'T1', '2023-01-07', 3],
['A', 'T1', '2023-01-08', 3],
['A', 'T1', '2023-01-09', 3],
['A', 'T1', '2023-01-10', 3],
['B', 'T1', '2023-01-01', 4],
['B', 'T1', '2023-01-02', 4],
['B', 'T1', '2023-01-03', 4],
['B', 'T1', '2023-01-04', 4],
['B', 'T1', '2023-01-05', 4],
]),
['Person', 'Task', 'Time', 'expectedRunID'])
I thought I could do it by defining a window and assigning a dense_rank
window = Window.partitionBy("Person").orderBy("Task")
df.withColumn("runID", F.dense_rank().over(window))
This somehow works, but it does not assign the reoccuring T1 for personA to runID 3, but since it is the same person-task combination it is also assigned runID 1.
Any ideas, how to handle this?
This would work:
w=Window.orderBy(F.asc("Person"),F.asc("Time"))
df\
.withColumn("change", F.when((F.lag("Task").over(w)==F.col("Task")) & (F.lag("Person").over(w)==F.col("Person")), 0).otherwise(1))\
.withColumn("run_id", F.sum("change").over(w))\
.show()
Input:
+------+----+----------+-------------+
|Person|Task| Time|expectedRunID|
+------+----+----------+-------------+
| A| T1|2023-01-01| 1|
| A| T1|2023-01-02| 1|
| A| T2|2023-01-03| 2|
| A| T2|2023-01-04| 2|
| A| T2|2023-01-05| 2|
| A| T1|2023-01-06| 3|
| A| T1|2023-01-07| 3|
| A| T1|2023-01-08| 3|
| A| T1|2023-01-09| 3|
| A| T1|2023-01-10| 3|
| B| T1|2023-01-01| 4|
| B| T1|2023-01-02| 4|
| B| T1|2023-01-03| 4|
| B| T1|2023-01-04| 4|
| B| T1|2023-01-05| 4|
| B| T2|2023-01-06| 4|
+------+----+----------+-------------+
Output (I kept the intermediate column for understanding, you can drop it):
+------+----+----------+-------------+------+------+
|Person|Task| Time|expectedRunID|change|run_id|
+------+----+----------+-------------+------+------+
| A| T1|2023-01-01| 1| 1| 1|
| A| T1|2023-01-02| 1| 0| 1|
| A| T2|2023-01-03| 2| 1| 2|
| A| T2|2023-01-04| 2| 0| 2|
| A| T2|2023-01-05| 2| 0| 2|
| A| T1|2023-01-06| 3| 1| 3|
| A| T1|2023-01-07| 3| 0| 3|
| A| T1|2023-01-08| 3| 0| 3|
| A| T1|2023-01-09| 3| 0| 3|
| A| T1|2023-01-10| 3| 0| 3|
| B| T1|2023-01-01| 4| 1| 4|
| B| T1|2023-01-02| 4| 0| 4|
| B| T1|2023-01-03| 4| 0| 4|
| B| T1|2023-01-04| 4| 0| 4|
| B| T1|2023-01-05| 4| 0| 4|
| B| T2|2023-01-06| 4| 1| 5|
+------+----+----------+-------------+------+------+