Search code examples
pyspark

PySpark subrtact very large dataframes


I need to extract data from two hive tables, which are very large. They are in two different schemas, but have same definition.

I need to compare the two tables and identify following in PySpark

  1. rows that are present in table1, but missing in table2
  2. rows that are present in both tables , but there is mismatch in values in any of the non key columns
  3. rows that are present in table2 , but missing in table1

e.g. Let's say the table has following cols

ProductId - BigInteger - PK
ProductVersion - int - PK
ProductName - char
ProductPrice - decimal
ProductDesc - varchar

Let's say the data is as follows

Table1 in Schema1
[1, 1, "T-Shirt", 10.50, "Soft-Washed Slub-Knit V-Neck"] -> Matches with Table2
[1, 2, "T-Shirt", 10.50, "Soft-Washed Striped Crew-Neck "] -> Price is different in Table1
[2, 1, "Short Sleeve Shirt", 10.50, "Everyday Printed Short-Sleeve Shirt"] -> Missing in Table2
[3, 1, "T-Shirt", 10.50, "Breathe ON Camo Tee"] -> Prod Desc is different in Table2

Table2 in Schema2
[1, 1, "T-Shirt", 10.50, "Soft-Washed Slub-Knit V-Neck"]  -> Matches with Table1
[2, 1, "Short Sleeve Shirt", 12.50, "Everyday Printed Short-Sleeve Shirt"]  -> Price is different 
[3, 1, "T-Shirt", 10.50, "Breathe ON Camo"] -> Prod Desc is different in Table2
[3, 2, "T-Shirt", 20, "Breathe ON ColorBlock Tee"] -> Missing in Table1

The expected result will be three separate data frames

  1. dfOut1 - will contain the rows that are present in table1 , but missing in table2 based on the primary key
["Missing in Table2", [1, 2, "T-Shirt", 10.50, "Soft-Washed Striped Crew-Neck "]]

The first column will indicate the difference type, If the difference type is "Missing in Table1" or "Missing in Table2", the entire row from the source table will be available i

  1. dfdiff -
["Difference", "ProductPrice", 2, 1, 10.50, 12.50]
["Difference", "ProductDesc", 3,1,  "Breathe ON Camo Tee",  "Breathe ON Camo"]
  1. dfout2 -
["Missing in Table1", [3, 2, "T-Shirt", 20, "Breathe ON ColorBlock Tee"]]

I am thinking of following approach

1. Create df1 from table1 using query "select * from schema1.table1"
2. Create df2 from table2 using query "select * from schema2.table2"
3. Use df1.except(df2) 

I referred to the documentation

I am not sure if this approach will work Will df1.except(df2) compare all the fields , or just the key columns ?

Also, not sure how to separate the output further


Solution

  • You are basically trying to find insert updates and deletes ( The deltas) between two datasets. here is one generic solution for such deltas

    from pyspark.sql.functions import sha2, concat_ws
    
    
    # getting the comma-separated keys to list
    key_column_list = keys.split(',')
    key_column_list= [x.strip().lower() for x in key_column_list]
    #The column name of the chnage indicator column to be found
    changeindicator="chg_id"
    df_compare_curr_df = spark.sql("select * from table1")
    df_compare_prev_df = spark.sql("select * from table2")
    #getting columns List
    currentcolumns = df_compare_curr_df.columns
    previouscolumns = df_compare_curr_df.columns
    #Creating Hash values so that this can generic for used for any kind of delta comparison
    df_compare_curr_df = df_compare_curr_df.withColumn("all_hash_val", sha2(concat_ws("||", *currentcolumns), 256))
    df_compare_curr_df = df_compare_curr_df.withColumn("key_val", sha2(concat_ws("||", *key_column_list), 256))   
    df_compare_prev_df = df_compare_prev_df.withColumn("key_val", sha2(concat_ws("||", *key_column_list), 256))
    df_compare_prev_df = df_compare_prev_df.withColumn("all_hash_val", sha2(concat_ws("||", *previouscolumns), 256))
    df_compare_curr_df.createOrReplaceTempView("NewTable")
    df_compare_prev_df.createOrReplaceTempView("OldTable")
    #creating the sql for delta basically left and inner joins .
    insert_sql = "select 'I' as " + changeindicator + ",A.* from NewTable A left outer join OldTable B on A.key_val = B.key_val where B.key_val is NULL"
    update_sql = "select 'U' as " + changeindicator + ",A.* from NewTable A inner join OldTable B on A.key_val = B.key_val where A.all_hash_val != B.all_hash_val"
    delete_sql = "select 'D' as  " + changeindicator + ",A.* from OldTable A left outer join NewTable B on A.key_val = B.key_val where B.key_val is NULL"
    nochange_sql = "select 'N' as  " + changeindicator + ",A.* from OldTable A inner join NewTable B on A.key_val = B.key_val where A.all_hash_val = B.all_hash_val"
    upsert_sql = insert_sql + " union " + update_sql
    all_changes_sql = insert_sql + " union " + update_sql + " union " + delete_sql
    
    df_compare_updates = spark.sql(update_sql)
    df_compare_inserts = spark.sql(insert_sql)
    df_compare_deletes = spark.sql(delete_sql)
    df_compare_upserts = spark.sql(upsert_sql)
    df_compare_changes = spark.sql(all_changes_sql)