Search code examples
pythondataframepysparkdata-quality

pyspark - compare two String col and show the diff in new col


I am doing some data quality checking,
How do I compare two StringType columns ('old_unmatch' and 'new_unmatch') and create new columns for the results ('new_unmatch' and 'missed_unmatch)?

old_unmatch current_unmatch new_unmatch missed_unmatch
['121', '122'] ['121', '123'] ['123'] ['122']

Solution

  • To compare two string columns in PySpark and create new columns to show the differences, you can use the udf (User-Defined Function) along with the array_except function.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf, array_except
    from pyspark.sql.types import ArrayType, StringType
    
    # Create a SparkSession
    spark = SparkSession.builder.getOrCreate()
    
    # Sample data
    data = [
        (['121', '122'], ['121', '123']),
        (['124', '125'], ['124', '125']),
        (['126', '127'], ['127', '128'])
    ]
    
    # Create a DataFrame
    df = spark.createDataFrame(data, ['old_unmatch', 'current_unmatch'])
    
    # Define a UDF to compare two arrays and return the differences
    def compare_arrays(old_arr, current_arr):
        return list(set(old_arr) - set(current_arr))
    
    # Register the UDF
    compare_arrays_udf = udf(compare_arrays, ArrayType(StringType()))
    
    # Add new columns to the DataFrame
    df = df.withColumn('new_unmatch', compare_arrays_udf(df['old_unmatch'], df['current_unmatch']))
    df = df.withColumn('missed_unmatch', compare_arrays_udf(df['current_unmatch'], df['old_unmatch']))
    
    # Show the resulting DataFrame
    df.show(truncate=False)
    
    

    or

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import array_except
    
    # Create a SparkSession
    spark = SparkSession.builder.getOrCreate()
    
    # Sample data
    data = [
        (['121', '122'], ['121', '123']),
        (['124', '125'], ['124', '125']),
        (['126', '127'], ['127', '128'])
    ]
    
    # Create a DataFrame
    df = spark.createDataFrame(data, ['old_unmatch', 'current_unmatch'])
    
    # Add new columns to the DataFrame
    df = df.withColumn('new_unmatch', array_except(df['current_unmatch'], df['old_unmatch']))
    df = df.withColumn('missed_unmatch', array_except(df['old_unmatch'], df['current_unmatch']))
    
    # Show the resulting DataFrame
    df.show(truncate=False)
    
    

    Output-

    +------------+----------------+-----------+---------------+
    |old_unmatch |current_unmatch |new_unmatch|missed_unmatch  |
    +------------+----------------+-----------+---------------+
    |[121, 122]  |[121, 123]      |[122]      |[123]          |
    |[124, 125]  |[124, 125]      |[]         |[]             |
    |[126, 127]  |[127, 128]      |[126]      |[128]          |
    +------------+----------------+-----------+---------------+