I have a Pyspark data frame structured as follows.
| date_id | anchor | subs_ids |
| -------- | ------ | ----------- |
| 2023-10-01 | 123 | ['456',345']|
| 2023-10-01 | 256 | ['278',875']|
| 2023-10-03 | 123 | ['703','409']|
| 2023-10-03 | 256 | ['801','704']|
| 2023-10-06 | 123 | ['398','209']|
| 2023-10-06 | 256 | ['659','783']|
This dataframe has a date ID as a partition, and there are missing days in between (e.g., 2023-10-02, 2023-10-04, and 2023-10-05).
I would like to create additional rows for the missing dates, where each new row contains the respective date ID and the same column values as the latest previous partition.
For example, for the missing date 2023-10-02, I want to create two additional rows with this date ID with the same column values as present in 2023-10-01. Similarly, for 2023-10-04 and 2023-10-05, create two additional rows for each date with the same column values as present in 2023-10-03.
The final desired table, in ascending order of date ID, would be as below.
| date_id | anchor | subs_ids |
| -------- | ------ | ----------- |
| 2023-10-01 | 123 | ['456',345']|
| 2023-10-01 | 256 | ['278',875']|
| 2023-10-02 | 123 | ['456',345']|
| 2023-10-02 | 256 | ['278',875']|
| 2023-10-03 | 123 | ['703','409']|
| 2023-10-03 | 256 | ['801','704']|
| 2023-10-04 | 123 | ['703','409']|
| 2023-10-04 | 256 | ['801','704']|
| 2023-10-05 | 123 | ['703','409']|
| 2023-10-05 | 256 | ['801','704']|
| 2023-10-06 | 123 | ['398','209']|
| 2023-10-06 | 256 | ['659','783']|
How can I achieve this in Python, specifically with pyspark functionality?
I've been trying various approaches without success. Any help or guidance would be greatly appreciated. Thank you!
you can create list of dates using sequence
function and then explode it.
data_sdf. \
withColumn('next_dateid', func.lead('dateid').over(wd.partitionBy('anchor').orderBy('dateid'))). \
withColumn('dtseq', func.expr('sequence(dateid, date_add(next_dateid, -1), interval 1 day)')). \
selectExpr('explode(coalesce(dtseq, array(dateid))) as dateid', 'anchor', 'subids'). \
orderBy('dateid', 'anchor'). \
show(truncate=False)
# +----------+------+----------+
# |dateid |anchor|subids |
# +----------+------+----------+
# |2023-10-01|123 |[456, 345]|
# |2023-10-01|256 |[278, 875]|
# |2023-10-02|123 |[456, 345]|
# |2023-10-02|256 |[278, 875]|
# |2023-10-03|123 |[703, 409]|
# |2023-10-03|256 |[801, 704]|
# |2023-10-04|123 |[703, 409]|
# |2023-10-04|256 |[801, 704]|
# |2023-10-05|123 |[703, 409]|
# |2023-10-05|256 |[801, 704]|
# |2023-10-06|123 |[398, 209]|
# |2023-10-06|256 |[659, 783]|
# +----------+------+----------+