I have data that contain sequential null and I want to make those sequential null data to different group
I have data like below
group_num | days | time | useage |
---|---|---|---|
1 | 20200101 | 1 | 10 |
1 | 20200101 | 2 | 10 |
1 | 20200101 | 3 | null |
2 | 20200102 | 1 | 30 |
2 | 20200102 | 2 | null |
2 | 20200102 | 3 | null |
2 | 20200102 | 4 | 50 |
2 | 20200102 | 5 | null |
3 | 20200105 | 10 | null |
3 | 20200105 | 11 | null |
3 | 20200105 | 12 | 5 |
What I want to do in this data is that make null_group data in usage as the group.
I want to make the same null group if null data is sequential. And also I want to make different null group if null data is not sequential or have different group_num.
group_num | days | time | useage | null_group |
---|---|---|---|---|
1 | 20200101 | 1 | 10 | |
1 | 20200101 | 2 | 10 | |
1 | 20200101 | 3 | null | group1 |
2 | 20200102 | 1 | 30 | |
2 | 20200102 | 2 | null | group2 |
2 | 20200102 | 3 | null | group2 |
2 | 20200102 | 4 | 50 | |
2 | 20200102 | 5 | null | group3 |
3 | 20200105 | 10 | null | group4 |
3 | 20200105 | 11 | null | group4 |
3 | 20200105 | 12 | 5 |
Or maybe make new data that only contain null data with different group.
group_num | days | time | useage | null_group |
---|---|---|---|---|
1 | 20200101 | 3 | null | group1 |
2 | 20200102 | 2 | null | group2 |
2 | 20200102 | 3 | null | group2 |
2 | 20200102 | 5 | null | group3 |
3 | 20200105 | 10 | null | group4 |
3 | 20200105 | 11 | null | group4 |
null_group can be change to numeric like below
group_num | days | time | useage | null_group |
---|---|---|---|---|
1 | 20200101 | 3 | null | 1 |
2 | 20200102 | 2 | null | 2 |
2 | 20200102 | 3 | null | 2 |
2 | 20200102 | 5 | null | 3 |
3 | 20200105 | 10 | null | 4 |
3 | 20200105 | 11 | null | 4 |
Can anyone help with this problem? I thought I can do this with pyspark's window function, but it didn't work very well. I think I have to use pyspark because the original data is too large handling as python.
This looks a bit complicated, but the last 2 parts are just for displaying correctly. The main logic goes like this:
Full solution:
w = Window.partitionBy('group_num').orderBy('time')
w_cumsum = w.rowsBetween(Window.unboundedPreceding, 0)
# main logic
df_tmp = (
df
.withColumn('dt', F.coalesce(F.col('time') - F.lag('time').over(w), F.lit(1)))
.withColumn('key', F.concat_ws('-', 'usage', 'dt'))
.withColumn('prev_key', F.lag('key').over(w))
.withColumn('diff', F.coalesce((F.col('key') != F.col('prev_key')).cast('int'), F.lit(1)))
.withColumn('cumsum', F.sum('diff').over(w_cumsum))
.withColumn('null_group_key',
F.when(F.isnull('usage'), F.concat_ws('-', 'group_num', 'cumsum')).otherwise(None))
)
# map to generate required group names
df_map = (
df_tmp
.select('null_group_key')
.distinct()
.dropna()
.sort('null_group_key')
.withColumn('null_group', F.concat(F.lit('group'), F.monotonically_increasing_id() + F.lit(1)))
)
# rename and display as needed
(
df_tmp
.join(df_map, 'null_group_key', 'left')
.fillna('', 'null_group')
.select('group_num', 'days', 'time', 'usage', 'null_group')
.sort('group_num', 'time')
.show()
)