Search code examples
pythonjoinpysparkcomparisonconditional-statements

PySpark: Compare two Dataframes based on common String column and generate Result Boolean withColumn()


I have two dataframes ddf_1 and ddf_2 that share a common column of stringed IDs. My goal is to create a new Boolean is_fine column in ddf_1 that contains True if the ID was contained in ddf_1 and ddf_2 or False if the ID was not contained in ddf_1 and ddf_2.

Consider this example data:

#### test
#example data
data_1 = { 
    'fruits': ["apples", "banana", "cherry"],
    'myid': ['1-12', '2-12', '3-13'],
    'meat': ["pig", "cow", "chicken"]}

data_2 = { 
    'furniture': ["table", "chair", "lamp"],
    'myid': ['1-12', '0-11', '2-12'],
    'clothing': ["pants", "shoes", "socks"]}

df_1 = pd.DataFrame(data_1)
ddf_1 = spark.createDataFrame(df_1)

df_2 = pd.DataFrame(data_2)
ddf_2 = spark.createDataFrame(df_2)

I imagine a function something like this:

def func(df_1, df_2, column_1, column_2):
    if df_1.column_1 != df_2.column_2:
       return df_1.withColumn('is_fact', False)
    else:
        return df_1.withColumn('is_fact', True)
    return df_1

The desired output should look like this:

enter image description here


Solution

  • You can perform a left outer join between the 2 dataframes on the my_id column and use a simple case statement to derive the is_fine column, as shown below,

    import pyspark.sql.functions as F
    
    ddf_1.join(ddf_2, ddf_1.myid == ddf_2.myid, 'left')\
    .withColumn('is_fine', F.when(ddf_2.myid.isNull(), False).otherwise(True))\
    .select(ddf_1['fruits'], ddf_1['myid'], ddf_1['meat'], 'is_fine').show()
    

    Output:

    +------+----+-------+-------+
    |fruits|myid|   meat|is_fine|
    +------+----+-------+-------+
    |cherry|3-13|chicken|  false|
    |apples|1-12|    pig|   true|
    |banana|2-12|    cow|   true|
    +------+----+-------+-------+