Search code examples
pysparkgrouping

Making different group of sequential null data in pyspark


I have data that contain sequential null and I want to make those sequential null data to different group

I have data like below

group_num days time useage
1 20200101 1 10
1 20200101 2 10
1 20200101 3 null
2 20200102 1 30
2 20200102 2 null
2 20200102 3 null
2 20200102 4 50
2 20200102 5 null
3 20200105 10 null
3 20200105 11 null
3 20200105 12 5

What I want to do in this data is that make null_group data in usage as the group.

I want to make the same null group if null data is sequential. And also I want to make different null group if null data is not sequential or have different group_num.

group_num days time useage null_group
1 20200101 1 10
1 20200101 2 10
1 20200101 3 null group1
2 20200102 1 30
2 20200102 2 null group2
2 20200102 3 null group2
2 20200102 4 50
2 20200102 5 null group3
3 20200105 10 null group4
3 20200105 11 null group4
3 20200105 12 5

Or maybe make new data that only contain null data with different group.

group_num days time useage null_group
1 20200101 3 null group1
2 20200102 2 null group2
2 20200102 3 null group2
2 20200102 5 null group3
3 20200105 10 null group4
3 20200105 11 null group4

null_group can be change to numeric like below

group_num days time useage null_group
1 20200101 3 null 1
2 20200102 2 null 2
2 20200102 3 null 2
2 20200102 5 null 3
3 20200105 10 null 4
3 20200105 11 null 4

Can anyone help with this problem? I thought I can do this with pyspark's window function, but it didn't work very well. I think I have to use pyspark because the original data is too large handling as python.


Solution

  • This looks a bit complicated, but the last 2 parts are just for displaying correctly. The main logic goes like this:

    1. calculate "time" difference "dt" between rows (needs to be 1 for the same "null_group")
    2. generate "key" from "usage" and "dt" columns
    3. use the trick to label consecutive rows (originally for pandas https://www.codeforests.com/2021/03/30/group-consecutive-rows-in-pandas/)
    4. rename and manipulate labels to get desired result

    Full solution:

    w = Window.partitionBy('group_num').orderBy('time')
    w_cumsum = w.rowsBetween(Window.unboundedPreceding, 0)
    
    # main logic
    df_tmp = (
        df
        .withColumn('dt', F.coalesce(F.col('time') - F.lag('time').over(w), F.lit(1)))
        .withColumn('key', F.concat_ws('-', 'usage', 'dt'))
        .withColumn('prev_key', F.lag('key').over(w))
        .withColumn('diff', F.coalesce((F.col('key') != F.col('prev_key')).cast('int'), F.lit(1)))
        .withColumn('cumsum', F.sum('diff').over(w_cumsum))
        .withColumn('null_group_key',
                    F.when(F.isnull('usage'), F.concat_ws('-', 'group_num', 'cumsum')).otherwise(None))
    )
    
    # map to generate required group names
    df_map = (
        df_tmp
        .select('null_group_key')
        .distinct()
        .dropna()
        .sort('null_group_key')
        .withColumn('null_group', F.concat(F.lit('group'), F.monotonically_increasing_id() + F.lit(1)))
    )
    
    # rename and display as needed
    (
        df_tmp
        .join(df_map, 'null_group_key', 'left')
        .fillna('', 'null_group')
        .select('group_num', 'days', 'time', 'usage', 'null_group')
        .sort('group_num', 'time')
        .show()
    )