I have a mysql database with 10TB data, and I have streamed all the data to s3 using iceberg table format through flink-cdc.
I want to check whether there is data loss, or whether there are mismatched data values.
Currently, I have 2 solution using pyspark.
# initialize two table in spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, md5
spark = SparkSession.builder.getOrCreate()
table_name = 'target_table'
df_iceberg_table = read_iceberg_table_using_spark(table_name)
df_mysql_table = read_mysql_table_using_spark(table_name)
table_columns = get_table_columns(table_name)
The first is to calculate the hash value for every row and compare them to check whether data is the same.
df_mysql_table_hash = (
df_mysql_table
.select(
col('id'),
md5(concat_ws('|', *table_columns)).alias('hash')
)
)
df_iceberg_table_hash = (
df_iceberg_table
.select(
col('id'),
md5(concat_ws('|', *table_columns)).alias('hash')
)
)
df_mysql_table_hash.createOrReplaceTempView('mysql_table_hash')
df_iceberg_table_hash.createOrReplaceTempView('iceberg_table_hash')
df_diff = spark.sql('''
select
d1.id as mysql_id,
d2.id as iceberg_id,
d1.hash as mysql_hash,
d2.hash as iceberg_hash
from mysql_table_hash d1
left outer join iceberg_table_hash d2 on d1.id = d2.id
where false
or d2.id is null
or d1.hash <> d2.hash
''')
# save df_diff to some where
The second is to use the subtract function in pyspark
df_diff = df_mysql_table.subtract(df_iceberg_table)
# save df_diff to some where
Which one is better and faster? Is there a better way to achieve this?
Similar to subtract()
, you can also use exceptAll()
method to find the differences between the two DataFrames. To me, it's especially useful in unit-testing. If the result of exceptAll()
is an empty DataFrame, it means the two DataFrames are identical. Otherwise, the non-empty DataFrame returned by exceptAll()
contains the rows that are different between the two DataFrames.
# Find the differences between df1 and df2
diff = df1.exceptAll(df2)
# Check if there are any differences
if diff.count() == 0:
print("DataFrames are identical.")
else:
print("DataFrames have differences.")
diff.show()
Also note that, exceptAll()
compares the DataFrames based on both the column values and the row order, while subtract()
compares the DataFrames based on the column values only, ignoring the row order. If the performance is a critical factor and the row order is not important, using subtract()
is likely to be more efficient than exceptAll()
, but with subtract()
, you cannot detect the duplicated rows, if any.