Search code examples
dataframeapache-sparkapache-spark-sqlapache-spark-dataset

spark data frame compare and show only values that are different


I have 2 dataframes to be compared and am using except to show the data present in first dataset and missing in the second.Its works fine i want to display only the values that are different instead of entire row so its easy for someone to identify the fields having difference .

BELOW IS THE CODE SNIPPET

 val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test6").getOrCreate();

  val schemaOrig = List( StructField("key",StringType,true)
    ,StructField("name",StringType,true)
    ,StructField("start_ts",TimestampType,true)
    ,StructField("txn_dt",StringType,true))

  val df =  spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("1","john",java.sql.Timestamp.valueOf("2018-10-16 00:00:00"),"2020-02-14")))
    ,StructType(schemaOrig))

  val df2 =  spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("1","andrew",java.sql.Timestamp.valueOf("2017-10-16 00:00:00"),"2020-02-14")))
    ,StructType(schemaOrig))

  df.except(df2).show(true)

+---+----+-------------------+----------+
|key|name|           start_ts|    txn_dt|
+---+----+-------------------+----------+
|  1|john| 2018-10-16 00:00:00 2020-02-14                 |
+---+----+-------------------+----------+

EXPECTED OUTPUT

+---+-------------+--------------------+
|key|diff columns |     diff values 
+---+----------------------------------+
 1   name,txn_dt      john,2018-10-16 00:00:00

Solution

  • Used full outer join & extracting not matched columns.

    Please check below code.

    scala> dfa.printSchema
    root
     |-- key: string (nullable = true)
     |-- name: string (nullable = true)
     |-- start_ts: timestamp (nullable = true)
     |-- txn_dt: string (nullable = true)
    
    
    scala> dfa.show(false)
    +---+----+-------------------+----------+
    |key|name|start_ts           |txn_dt    |
    +---+----+-------------------+----------+
    |1  |john|2018-10-16 00:00:00|2020-02-14|
    +---+----+-------------------+----------+
    
    
    scala> dfb.printSchema
    root
     |-- key: string (nullable = true)
     |-- name: string (nullable = true)
     |-- start_ts: timestamp (nullable = true)
     |-- txn_dt: string (nullable = true)
    
    
    scala> dfb.show(false)
    +---+------+-------------------+----------+
    |key|name  |start_ts           |txn_dt    |
    +---+------+-------------------+----------+
    |1  |andrew|2017-10-16 00:00:00|2020-02-14|
    +---+------+-------------------+----------+
    
    
    scala> val diff_cols = dfa.columns.filterNot(_ == "key").map(c => when(dfa(c) =!= dfb(c),c))
    diff_cols: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (NOT (name = name)) THEN name END, CASE WHEN (NOT (start_ts = start_ts)) THEN start_ts END, CASE WHEN (NOT (txn_dt = txn_dt)) THEN txn_dt END)
    
    scala> val diff_values = dfa.columns.filterNot(_ == "key").map(c => when(dfa(c) =!= dfb(c),dfa(c)))
    diff_values: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (NOT (name = name)) THEN name END, CASE WHEN (NOT (start_ts = start_ts)) THEN start_ts END, CASE WHEN (NOT (txn_dt = txn_dt)) THEN txn_dt END)
    
    scala> dfa.join(dfb,dfa("key") === dfb("key"),"full").select(dfa("key"),concat_ws(",",diff_cols:_*).as("diff_columns"),concat_ws(",",diff_values:_*).as("diff_values")).show(false) // using full join & taking diff columns & values.
    +---+-------------+------------------------+
    |key|diff_columns |diff_values             |
    +---+-------------+------------------------+
    |1  |name,start_ts|john,2018-10-16 00:00:00|
    +---+-------------+------------------------+
    
    
    scala>