I have an iceberg table like below and trying to run a query using trino to provide the expected output.
Sample Data
trino:datalakepartncr_trino> select src.ID, src.OUTSTANDING1, src.OUTSTANDING2, src.OUTSTANDINGP, src.LASTACTION, src.LASTACTIONDATE, pos,changed_cols from table_event where src.ID in (4998,4952,4959) order by pos;
ID | OUTSTANDING1 | OUTSTANDING2 | OUTSTANDINGP | LASTACTION | LASTACTIONDATE | pos | changed_cols
------+--------------+--------------+--------------+------------+---------------------+-----------------+---------------------------------------------------------------
4952 | 0.0 | 0 | 0 | NULL | NULL | 177240302252540 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
4952 | NULL | NULL | NULL | NULL | 2022-12-15 13:30:02 | 177240302252701 | [ID, LASTACTION, LASTACTIONDATE]
4952 | NULL | NULL | NULL | ERA | 2022-12-15 13:30:05 | 177240302255887 | [ID, LASTACTION, LASTACTIONDATE]
4959 | 162.94 | 0 | 0 | NULL | NULL | 177240304204719 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
4959 | 160.0 | 0 | 0 | NULL | NULL | 177240304207643 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
4959 | 141.74 | 0 | 0 | NULL | NULL | 177240304209836 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
4959 | NULL | 0 | 0 | NULL | NULL | 177240304212737 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
4959 | NULL | NULL | NULL | CLOSE | 2022-12-15 13:30:07 | 177240304212873 | [ID, LASTACTION, LASTACTIONDATE]
4959 | NULL | NULL | NULL | ERA | 2022-12-15 13:30:07 | 177240304218096 | [ID, LASTACTION, LASTACTIONDATE]
4959 | NULL | NULL | NULL | NULL | NULL | 177240304219369 | [ID, LASTACTION, LASTACTIONDATE]
4998 | 25.0 | 0 | 0 | NULL | NULL | 177330411767496 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
4998 | NULL | NULL | NULL | CREATE | 2022-12-15 16:41:06 | 177330411767632 | [ID, LASTACTION, LASTACTIONDATE]
4998 | 55.0 | 0 | 0 | NULL | NULL | 177330411775790 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
4998 | 55.0 | 0 | 0 | NULL | NULL | 177330411781226 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
The expected result should be the latest value of each column based on POS ordering and also the changed_cols to differentiate data NULLs vs spark NULLs.
The table is created via spark by reading from a JSON file. We are using the changed_cols field to differentiate NULLs that got generated by spark due to the field not being not present vs actual NULLs in the data itself.
If you look at the 4th record of ID:4959 we could see that the NULL in column OUTSTANDING1 is not because of field not being present but due to the data itself and that can be distinguished from changed_cols values.
Expected output:
ID | OUTSTANDING1 | OUTSTANDING2 | OUTSTANDINGP | LASTACTION | LASTACTIONDATE | changed_cols
------+--------------+--------------+--------------+------------+---------------------+----------------------------------------------------------------------------
4952 | 0.0 | 0 | 0 | ERA | 2022-12-15 13:30:05 | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP, LASTACTION, LASTACTIONDATE]
4959 | NULL | 0 | 0 | NULL | NULL | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP, LASTACTION, LASTACTIONDATE]
4998 | 55.0 | 0 | 0 | NULL | NULL | [ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP]
I am able to rollup the changed_cols and get the distinct values but getting the latest value is not working.
trino:datalakepartncr_trino> select op_type,src.ID, lag(src.OUTSTANDING1) over (partition by src.ID order by pos),src.outstanding1, pos,array_distinct(flatten(array_agg(changed_cols) over (partition by src.ID order by pos))) from claim_event where src.ID in (4998,4952,4959) and op_type = 'U' order by op_ts, pos
Please provide some ideas on how to achieve this.
Additional sample JSON data to easily load and test
{"table":"TABLEA", "POS": 1, "ID": 244, "COLUMNA": 283.7, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "POS": 2, "ID": 244, "COLUMNA": null, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "POS": 3, "ID": 244, "COLUMNA": 200, "COLUMNC": "CLOSE", "changed_cols": ["ID", "COLUMNA", "COLUMNC"]}
{"table":"TABLEA", "POS": 4, "ID": 244, "COLUMNA": null, "COLUMNB": "user", "COLUMNC": "INTERIM", "COLUMND": 35000, "changed_cols": ["ID", "COLUMNA", "COLUMNB", "COLUMNC", "COLUMND"]}
{"table":"TABLEA", "POS": 5, "ID": 244, "COLUMNB": "user", "COLUMNC": "OPEN", "changed_cols": ["ID", "COLUMNB", "COLUMNC"]}
{"table":"TABLEA", "POS": 6, "ID": 244, "COLUMNB": "user", "COLUMNC": null, "changed_cols": ["ID", "COLUMNB", "COLUMNC"]}
{"table":"TABLEA", "POS": 1, "ID": 245, "COLUMNA": 283.7, "changed_cols": ["ID", "COLUMNA"]}
{"table":"TABLEA", "POS": 2, "ID": 245, "COLUMNB": null, "changed_cols": ["ID", "COLUMNB"]}
{"table":"TABLEA", "POS": 3, "ID": 245, "COLUMNA": 200, "COLUMNC": "CLOSE", "changed_cols": ["ID", "COLUMNA", "COLUMNC"]}
Expected Output of sample JSON:
If we take the latest changed value from the above events for the primary key(ID) it will look like
+---+-------+-------+-------+-------+----------------------------------------+
|ID |COLUMNA|COLUMNB|COLUMNC|COLUMND|COL_LIST |
+---+-------+-------+-------+-------+----------------------------------------+
|244|null |user |null |35000 |[ID, COLUMNA, COLUMNC, COLUMNB, COLUMND]|
|245|200.0 |null |CLOSE |null |[ID, COLUMNA, COLUMNB, COLUMNC] |
+---+-------+-------+-------+-------+----------------------------------------+```
If I understand correctly - you have "correct" data only if column name is present in changed_cols
. One approach is to use a bit monstrous approach via max_by
which will take in account only data which present in changed_cols
array via if
function which will return pos
if analyzed column is present in the array:
-- sample data
WITH dataset(ID, OUTSTANDING1, OUTSTANDING2, OUTSTANDINGP, LASTACTION, LASTACTIONDATE, pos, changed_cols) AS (
values (4952, 0.0, 0, 0, NULL ,NULL , 177240302252540, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
(4952, NULL, NULL, NULL, NULL ,'2022-12-15 13:30:02', 177240302252701, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
(4952, NULL, NULL, NULL, 'ERA' ,'2022-12-15 13:30:05', 177240302255887, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
(4959, 162.94, 0, 0, NULL ,NULL , 177240304204719, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
(4959, 160.0, 0, 0, NULL ,NULL , 177240304207643, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
(4959, 141.74, 0, 0, NULL ,NULL , 177240304209836, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
(4959, NULL, 0, 0, NULL ,NULL , 177240304212737, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
(4959, NULL, NULL, NULL, 'CLOSE' , '2022-12-15 13:30:07', 177240304212873, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
(4959, NULL, NULL, NULL, 'ERA' , '2022-12-15 13:30:07', 177240304218096, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
(4959, NULL, NULL, NULL, NULL ,NULL , 177240304219369, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
(4998, 25.0, 0, 0, NULL ,NULL , 177330411767496, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
(4998, NULL, NULL, NULL, 'CREATE' , '2022-12-15 16:41:06', 177330411767632, array['ID', 'LASTACTION', 'LASTACTIONDATE']),
(4998, 55.0, 0, 0, NULL ,NULL , 177330411775790, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP']),
(4998, 55.0, 0, 0, NULL ,NULL , 177330411781226, array['ID', 'OUTSTANDING1', 'OUTSTANDING2', 'OUTSTANDINGP'])
)
-- query
select ID,
max_by(OUTSTANDING1, if(contains(changed_cols, 'OUTSTANDING1'), pos)) OUTSTANDING1,
max_by(OUTSTANDING2, if(contains(changed_cols, 'OUTSTANDING2'), pos)) OUTSTANDING2,
max_by(LASTACTION, if(contains(changed_cols, 'LASTACTION'), pos)) LASTACTION,
-- handle arrays:
array_distinct(flatten(array_agg(changed_cols))) changed_cols
-- ... the rest of needed columns
from dataset
group by ID;
Output:
ID | OUTSTANDING1 | OUTSTANDING2 | LASTACTION |
---|---|---|---|
4952 | 0.00 | 0 | ERA |
4998 | 55.00 | 0 | CREATE |
4959 | NULL | 0 | NULL |