Search code examples
pythondataframepysparkapache-spark-sql

Compare a pyspark dataframe to another dataframe


I have 2 data frames to compare both have the same number of columns and the comparison result should have the field that is mismatching and the values along with the ID.

Dataframe one

+-----+---+--------+
| name| id|    City|
+-----+---+--------+
|  Sam|  3| Toronto|
| BALU| 11|     YYY|
|CLAIR|  7|Montreal|
|HELEN| 10|  London|
|HELEN| 16|  Ottawa|
+-----+---+--------+

Dataframe two

+-------------+-----------+-------------+
|Expected_name|Expected_id|Expected_City|
+-------------+-----------+-------------+
|          SAM|          3|      Toronto|
|         BALU|         11|          YYY|
|        CLARE|          7|     Montreal|
|        HELEN|         10|        Londn|
|        HELEN|         15|       Ottawa|
+-------------+-----------+-------------+

Expected Output

+---+------------+--------------+-----+
| ID|Actual_value|Expected_value|Field|
+---+------------+--------------+-----+
|  7|       CLAIR|         CLARE| name|
|  3|         Sam|           SAM| name|
| 10|      London|         Londn| City|
+---+------------+--------------+-----+

Code

Create example data

from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession

sc = SparkContext()
sql_context = SQLContext(sc)

spark = SparkSession.builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR") # log only on fails

df_Actual = sql_context.createDataFrame(
    [("Sam", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLAIR", 7,'Montreal'), 
     ("HELEN", 10,'London'), ("HELEN", 16,'Ottawa')],
    ["name", "id","City"]
)

df_Expected = sql_context.createDataFrame(
     [("SAM", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLARE", 7,'Montreal'), 
      ("HELEN", 10,'Londn'), ("HELEN", 15,'Ottawa')],
     ["Expected_name", "Expected_id","Expected_City"]
)

Create empty dataframe for Result

field = [
    StructField("ID",StringType(), True),
    StructField("Actual_value", StringType(), True), 
    StructField("Expected_value", StringType(), True),
    StructField("Field", StringType(), True)
]

schema = StructType(field)
Df_Result = sql_context.createDataFrame(sc.emptyRDD(), schema)

Join expected and actual on id's

df_cobined = df_Actual.join(df_Expected, (df_Actual.id == df_Expected.Expected_id))

col_names=df_Actual.schema.names

Loop through each column to find mismatches

for col_name in col_names:

    #Filter for column values not matching
    df_comp= df_cobined.filter(col(col_name)!=col("Expected_"+col_name ))\
        .select(col('id'),col(col_name),col("Expected_"+col_name ))

    #Add not matching column name
    df_comp = df_comp.withColumn("Field", lit(col_name))

    #Add to final result
    Df_Result = Df_Result.union(df_comp)
Df_Result.show()

This code works as expected. However, in the real case, I have more columns and millions of rows to compare. With this code, it takes more time to finish the comparison. Is there a better way to increase the performance and get the same result?


Solution

  • For this who are looking for an answer, I transposed the data frame and then did a comparison.

    from pyspark.sql.functions import array, col, explode, struct, lit
    def Transposedf(df, by,colheader):
    
    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"
    
    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([ struct(lit(c).alias("Field"), col(c).alias(colheader)) for c in cols ])).alias("kvs")
    
    return df.select(by + [kvs]).select(by + ["kvs.Field", "kvs."+colheader])
    

    Then the comparison looks like this

    def Compare_df(df_Expected,df_Actual):
      df_combined = (df_Actual
        .join(df_Expected, ((df_Actual.id == df_Expected.id) 
                            & (df_Actual.Field == df_Expected.Field) 
                            & (df_Actual.Actual_value != df_Expected.Expected_value)))
        .select([df_Actual.account_unique_id,df_Actual.Field,df_Actual.Actual_value,df_Expected.Expected_value])
        )
          return df_combined 
    

    I called these 2 functions as

    df_Actual=Transposedf(df_Actual, ["id"],'Actual_value')
    df_Expected=Transposedf(df_Expected, ["id"],'Expected_value')
    
    #Compare the expected and actual
    df_result=Compare_df(df_Expected,df_Actual)