I have one base table, which is holding the actual data. below is the table structure
id | name | address | age | date |
---|---|---|---|---|
A1 | {"fname": "Alex", "lname": "Bhatt"} | {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} | 56 | 20201128 |
A2 | {"fname": "Bob", "lname": "Natarajan"} | {"lane": "Royd Street", "flat": ["22", "23", "27"], "pin": "123514"} | 53 | 20201123 |
A1 | {"fname": "Alex", "lname": "Bhattacharya"} | {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} | 56 | 20201228 |
A2 | {"fname": "Bob", "lname": "Natarajan"} | {"lane": "Royd Street", "flat": ["22", "24", "27"], "pin": "123514"} | 53 | 20201228 |
In the above table for A1 and A2 there is change in data. This changed data summary is captured and provided by another table. The table structure is mentioned below.
id | changed_field | date |
---|---|---|
A1 | name.lname | 20201228 |
A2 | address.flat[1] | 20201228 |
From the above 2 table, I have to prepare the final table where the detail of changed data will be captured. Below is the expected table.
id | changed_field | new_value | newdate | old_value | olddate |
---|---|---|---|---|---|
A1 | name.lname | Bhattacharya | 20201228 | Bhatt | 20201128 |
A2 | address.flat[1] | 24 | 20201228 | 23 | 20201123 |
I have tried with spark sql functions get_json_object() but it is not working. Any suggestion will be really helpful
I think you need to create another json column in order to use get_json_object
... see my answer below.
import pyspark.sql.functions as F
result = df1.select(
'id',
'date',
F.to_json(
F.struct(
F.from_json('name', 'fname string, lname string').alias('name'),
F.from_json('address', 'lane string, flat array<string>, pin string').alias('address')
)
).alias('jsoncol')
).join(
df2.withColumnRenamed('date', 'date2'), 'id'
).withColumn(
'new_value',
F.expr("get_json_object(jsoncol, '$.' || changed_field)")
).groupBy('id', 'changed_field').agg(
F.array_sort(
F.collect_list(
F.array('date', 'new_value')
)
).alias('values')
).select(
'id',
'changed_field',
F.col('values')[1][1].alias('new_value'),
F.col('values')[1][0].alias('newdate'),
F.col('values')[0][1].alias('old_value'),
F.col('values')[0][0].alias('olddate')
)
result.show(truncate=False)
+---+---------------+------------+--------+---------+--------+
|id |changed_field |new_value |newdate |old_value|olddate |
+---+---------------+------------+--------+---------+--------+
|A1 |name.lname |Bhattacharya|20201228|Bhatt |20201128|
|A2 |address.flat[1]|24 |20201228|23 |20201123|
+---+---------------+------------+--------+---------+--------+