Search code examples
apache-sparkpyspark

Assign RunID on changing category


I would like to assign a RunID to a person performing different tasks. A new RunID should be assigned whenever the task changes, and of course this should change per person, too.

This is some example data with the expected RunID already added.

df = spark.createDataFrame(sc.parallelize([
            ['A', 'T1', '2023-01-01', 1],
            ['A', 'T1', '2023-01-02', 1],
            ['A', 'T2', '2023-01-03', 2],
            ['A', 'T2', '2023-01-04', 2],
            ['A', 'T2', '2023-01-05', 2],
            ['A', 'T1', '2023-01-06', 3],
            ['A', 'T1', '2023-01-07', 3],
            ['A', 'T1', '2023-01-08', 3],
            ['A', 'T1', '2023-01-09', 3],
            ['A', 'T1', '2023-01-10', 3],
            ['B', 'T1', '2023-01-01', 4],
            ['B', 'T1', '2023-01-02', 4],
            ['B', 'T1', '2023-01-03', 4],
            ['B', 'T1', '2023-01-04', 4],
            ['B', 'T1', '2023-01-05', 4],
            ]),
            ['Person', 'Task', 'Time', 'expectedRunID'])

I thought I could do it by defining a window and assigning a dense_rank

window = Window.partitionBy("Person").orderBy("Task")
df.withColumn("runID", F.dense_rank().over(window))

This somehow works, but it does not assign the reoccuring T1 for personA to runID 3, but since it is the same person-task combination it is also assigned runID 1.

Any ideas, how to handle this?


Solution

  • This would work:

    w=Window.orderBy(F.asc("Person"),F.asc("Time"))
    
    df\
        .withColumn("change", F.when((F.lag("Task").over(w)==F.col("Task")) & (F.lag("Person").over(w)==F.col("Person")), 0).otherwise(1))\
        .withColumn("run_id", F.sum("change").over(w))\
        .show()
    

    Input:

    +------+----+----------+-------------+
    |Person|Task|      Time|expectedRunID|
    +------+----+----------+-------------+
    |     A|  T1|2023-01-01|            1|
    |     A|  T1|2023-01-02|            1|
    |     A|  T2|2023-01-03|            2|
    |     A|  T2|2023-01-04|            2|
    |     A|  T2|2023-01-05|            2|
    |     A|  T1|2023-01-06|            3|
    |     A|  T1|2023-01-07|            3|
    |     A|  T1|2023-01-08|            3|
    |     A|  T1|2023-01-09|            3|
    |     A|  T1|2023-01-10|            3|
    |     B|  T1|2023-01-01|            4|
    |     B|  T1|2023-01-02|            4|
    |     B|  T1|2023-01-03|            4|
    |     B|  T1|2023-01-04|            4|
    |     B|  T1|2023-01-05|            4|
    |     B|  T2|2023-01-06|            4|
    +------+----+----------+-------------+
    

    Output (I kept the intermediate column for understanding, you can drop it):

    +------+----+----------+-------------+------+------+
    |Person|Task|      Time|expectedRunID|change|run_id|
    +------+----+----------+-------------+------+------+
    |     A|  T1|2023-01-01|            1|     1|     1|
    |     A|  T1|2023-01-02|            1|     0|     1|
    |     A|  T2|2023-01-03|            2|     1|     2|
    |     A|  T2|2023-01-04|            2|     0|     2|
    |     A|  T2|2023-01-05|            2|     0|     2|
    |     A|  T1|2023-01-06|            3|     1|     3|
    |     A|  T1|2023-01-07|            3|     0|     3|
    |     A|  T1|2023-01-08|            3|     0|     3|
    |     A|  T1|2023-01-09|            3|     0|     3|
    |     A|  T1|2023-01-10|            3|     0|     3|
    |     B|  T1|2023-01-01|            4|     1|     4|
    |     B|  T1|2023-01-02|            4|     0|     4|
    |     B|  T1|2023-01-03|            4|     0|     4|
    |     B|  T1|2023-01-04|            4|     0|     4|
    |     B|  T1|2023-01-05|            4|     0|     4|
    |     B|  T2|2023-01-06|            4|     1|     5|
    +------+----+----------+-------------+------+------+