Search code examples
pythonapache-sparkbigdataapache-iceberg

Is there a better way to check data loss and mismatched data after flink-cdc streamed all my data from database to data lake?


I have a mysql database with 10TB data, and I have streamed all the data to s3 using iceberg table format through flink-cdc.

I want to check whether there is data loss, or whether there are mismatched data values.

Currently, I have 2 solution using pyspark.

# initialize two table in spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, md5

spark = SparkSession.builder.getOrCreate()

table_name = 'target_table'
df_iceberg_table = read_iceberg_table_using_spark(table_name)
df_mysql_table = read_mysql_table_using_spark(table_name)
table_columns = get_table_columns(table_name)

The first is to calculate the hash value for every row and compare them to check whether data is the same.

df_mysql_table_hash = (
    df_mysql_table
        .select(
            col('id'),
            md5(concat_ws('|', *table_columns)).alias('hash')
        )
)

df_iceberg_table_hash = (
    df_iceberg_table
        .select(
            col('id'),
            md5(concat_ws('|', *table_columns)).alias('hash')
        )
)

df_mysql_table_hash.createOrReplaceTempView('mysql_table_hash')
df_iceberg_table_hash.createOrReplaceTempView('iceberg_table_hash')

df_diff = spark.sql('''
    select 
        d1.id as mysql_id, 
        d2.id as iceberg_id, 
        d1.hash as mysql_hash, 
        d2.hash as iceberg_hash
    from mysql_table_hash d1
    left outer join iceberg_table_hash d2 on d1.id = d2.id
    where false
        or d2.id is null
        or d1.hash <> d2.hash
''')

# save df_diff to some where

The second is to use the subtract function in pyspark

df_diff = df_mysql_table.subtract(df_iceberg_table)

# save df_diff to some where

Which one is better and faster? Is there a better way to achieve this?


Solution

  • Similar to subtract(), you can also use exceptAll() method to find the differences between the two DataFrames. To me, it's especially useful in unit-testing. If the result of exceptAll() is an empty DataFrame, it means the two DataFrames are identical. Otherwise, the non-empty DataFrame returned by exceptAll() contains the rows that are different between the two DataFrames.

    # Find the differences between df1 and df2
    diff = df1.exceptAll(df2)
    
    # Check if there are any differences
    if diff.count() == 0:
        print("DataFrames are identical.")
    else:
        print("DataFrames have differences.")
        diff.show()
    

    Also note that, exceptAll() compares the DataFrames based on both the column values and the row order, while subtract() compares the DataFrames based on the column values only, ignoring the row order. If the performance is a critical factor and the row order is not important, using subtract() is likely to be more efficient than exceptAll(), but with subtract(), you cannot detect the duplicated rows, if any.