Search code examples
apache-sparkpysparkapache-spark-sqlchange-data-capture

pyspark change data capture implementation


I have one base table, which is holding the actual data. below is the table structure

id name address age date
A1 {"fname": "Alex", "lname": "Bhatt"} {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} 56 20201128
A2 {"fname": "Bob", "lname": "Natarajan"} {"lane": "Royd Street", "flat": ["22", "23", "27"], "pin": "123514"} 53 20201123
A1 {"fname": "Alex", "lname": "Bhattacharya"} {"lane": "Mac Street", "flat": ["24", "26", "27", "29"]} 56 20201228
A2 {"fname": "Bob", "lname": "Natarajan"} {"lane": "Royd Street", "flat": ["22", "24", "27"], "pin": "123514"} 53 20201228

In the above table for A1 and A2 there is change in data. This changed data summary is captured and provided by another table. The table structure is mentioned below.

id changed_field date
A1 name.lname 20201228
A2 address.flat[1] 20201228

From the above 2 table, I have to prepare the final table where the detail of changed data will be captured. Below is the expected table.

id changed_field new_value newdate old_value olddate
A1 name.lname Bhattacharya 20201228 Bhatt 20201128
A2 address.flat[1] 24 20201228 23 20201123

I have tried with spark sql functions get_json_object() but it is not working. Any suggestion will be really helpful


Solution

  • I think you need to create another json column in order to use get_json_object... see my answer below.

    import pyspark.sql.functions as F
    
    result = df1.select(
        'id',
        'date',
        F.to_json(
            F.struct(
                F.from_json('name', 'fname string, lname string').alias('name'),
                F.from_json('address', 'lane string, flat array<string>, pin string').alias('address')
            )
        ).alias('jsoncol')
    ).join(
        df2.withColumnRenamed('date', 'date2'), 'id'
    ).withColumn(
        'new_value',
        F.expr("get_json_object(jsoncol, '$.' || changed_field)")
    ).groupBy('id', 'changed_field').agg(
        F.array_sort(
            F.collect_list(
                F.array('date', 'new_value')
            )
        ).alias('values')
    ).select(
        'id',
        'changed_field',
        F.col('values')[1][1].alias('new_value'),
        F.col('values')[1][0].alias('newdate'),
        F.col('values')[0][1].alias('old_value'),
        F.col('values')[0][0].alias('olddate')
    )
    
    result.show(truncate=False)
    +---+---------------+------------+--------+---------+--------+
    |id |changed_field  |new_value   |newdate |old_value|olddate |
    +---+---------------+------------+--------+---------+--------+
    |A1 |name.lname     |Bhattacharya|20201228|Bhatt    |20201128|
    |A2 |address.flat[1]|24          |20201228|23       |20201123|
    +---+---------------+------------+--------+---------+--------+