Search code examples
pythonapache-sparkpysparkpyspark-pandaspyspark-schema

Pyspark: Compare Column Values across different dataframe


we are planning to do the following, compare two dataframe, based on comparision add values into first dataframe and then groupby to have combined data.

We are using pyspark dataframe and the following are our dataframes.

Dataframe1:

| Manager    | Department          |  isHospRelated
| --------   | --------------      | --------------
| Abhinav    | Medical             |  t
| Abhinav    | Surgery             |  t
| Rajesh     | Payments            |  t
| Rajesh     | HR                  |  t
| Sonu       | Onboarding          |  t
| Sonu       | Surgery             |  t
| Sonu       | HR                  |  t

Dataframe2:

| OrgSupported| OrgNonSupported          |
| --------   | --------------            |
| Medical    | Payment                   |
| Surgery    | Onboarding                |

We plan to compare Dataframe1 with Dataframe2 and obtain the following results:

| Manager    | Department          | Org Supported | Org NotSupported
| --------   | --------------      | ------------- | ----------------
| Abhinav    | Medical             | Medical       |        
| Abhinav    | Surgery             | Surgery       |
| Rajesh     | Payments            |               | Payments
| Rajesh     | HR                  |               | HR
| Sonu       | Onboarding          |               | Onboarding
| Sonu       | Surgery             | Surgery       | 
| Sonu       | HR                  |               | HR

Finally we would like to groupthem as follows:

| Manager    | Department         | isHospRelated  | Org Supported  | Org NotSupported
| --------   | --------------     | ------------   | -------------  | ----------------
| Abhinav    | Medical,Surgery    | t              | Medical,Surgery|        
| Rajesh     | Payments, HR       | t              |                | Payments, HR
| Sonu     | Onboarding,Surgery,HR| t              |  Surgery       | Onboarding, HR

We are using pyspark in our code, any suggestions how do we do these kind of comparison in pyspark.


Solution

  • I have one solution, i am joining df2 two times first to get NonSupported then to get supported. When its done its easy to group by manager and collect_lists of values

    You can try something like this:

    import pyspark.sql.functions as F
    
    df = [
        {"Manager": "Abhinav", "Department": "Medical", "isHospRelated": "t"},
        {"Manager": "Abhinav", "Department": "Surgery", "isHospRelated": "t"},
        {"Manager": "Rajesh", "Department": "Payments", "isHospRelated": "t"},
        {"Manager": "Rajesh", "Department": "Hr", "isHospRelated": "t"},
        {"Manager": "Sonu", "Department": "Onboarding", "isHospRelated": "t"},
        {"Manager": "Sonu", "Department": "Surgery", "isHospRelated": "t"},
    ]
    
    df2 = [
        {"OrgSupported": "Medical", "OrgNonSupported": "Payments"},
        {"OrgSupported": "Surgery", "OrgNonSupported": "Onboarding"},
    ]
    
    df = spark.createDataFrame(df)
    df2 = spark.createDataFrame(df2)
    
    dfWithNonSupported = df.join(
        df2.drop("OrgNonSupported"), df.Department == df2.OrgSupported, "left"
    )
    dfWithSupportedAndNonSupported = dfWithNonSupported.join(
        df2.drop("OrgSupported"),
        dfWithNonSupported.Department == df2.OrgNonSupported,
        "left",
    )
    
    finalDf = dfWithSupportedAndNonSupported.withColumn(
        "OrgNonSupported",
        F.when(
            (F.col("OrgSupported").isNull()) & (F.col("OrgSupported").isNull()),
            F.col("Department"),
        ).otherwise(F.col("OrgNonSupported")),
    )
    
    finalDf.groupBy("Manager").agg(
        F.collect_list("Department").alias("Department"),
        F.collect_list("OrgSupported").alias("OrgSupported"),
        F.collect_list("OrgNonSupported").alias("OrgNonSupported"),
        F.first("isHospRelated").alias("isHospRelated"),
    ).show()
    

    output:

    +-------+--------------------+------------------+---------------+-------------+
    |Manager|          Department|      OrgSupported|OrgNonSupported|isHospRelated|
    +-------+--------------------+------------------+---------------+-------------+
    |Abhinav|  [Medical, Surgery]|[Medical, Surgery]|             []|            t|
    | Rajesh|      [Payments, Hr]|                []| [Payments, Hr]|            t|
    |   Sonu|[Onboarding, Surg...|         [Surgery]|   [Onboarding]|            t|
    +-------+--------------------+------------------+---------------+-------------+