Search code examples
apache-sparkpysparkazure-databricksazure-synapse

How to update dataframe column value while joinining with other dataframe in pyspark?


I have 3 Dataframe df1(EMPLOYEE_INFO),df2(DEPARTMENT_INFO),df3(COMPANY_INFO) and i want to update a column which is in df1 by joining all the three dataframes . The name of column is FLAG_DEPARTMENT which is in df1. I need to set the FLAG_DEPARTMENT='POLITICS' . In sql query will look like this.

UPDATE [COMPANY_INFO] INNER JOIN ([DEPARTMENT_INFO] 
INNER JOIN [EMPLOYEE_INFO] ON [DEPARTMENT_INFO].DEPT_ID = [EMPLOYEE_INFO].DEPT_ID)
ON [COMPANY_INFO].[COMPANY_DEPT_ID] = [DEPARTMENT_INFO].[DEP_COMPANYID]
SET EMPLOYEE_INFO.FLAG_DEPARTMENT = "POLITICS";

If the values in columns of these three tables matches i need to set my FLAG_DEPARTMENT='POLITICS' in my employee_Info Table

How can i achieve this same thing in pyspark. I have just started learning pyspark don't have that much depth knowledge?


Solution

  • You can use a chain of joins with a select on top of it.

    Suppose that you have the following pyspark DataFrames:

    employee_df
    +---------+-------+
    |     Name|dept_id|
    +---------+-------+
    |     John| dept_a|
    |      Liù| dept_b|
    |     Luke| dept_a|
    |  Michail| dept_a|
    |      Noe| dept_e|
    |Shinchaku| dept_c|
    |     Vlad| dept_e|
    +---------+-------+
    
    department_df
    +-------+----------+------------+
    |dept_id|company_id| description|
    +-------+----------+------------+
    | dept_a|  company1|Department A|
    | dept_b|  company2|Department B|
    | dept_c|  company5|Department C|
    | dept_d|  company3|Department D|
    +-------+----------+------------+
    
    company_df
    +----------+-----------+
    |company_id|description|
    +----------+-----------+
    |  company1|  Company 1|
    |  company2|  Company 2|
    |  company3|  Company 3|
    |  company4|  Company 4|
    +----------+-----------+
    

    Then you can run the following code to add the flag_department column to your employee_df:

    from pyspark.sql import functions as F
    
    employee_df = (
            employee_df.alias('a')
            .join(
                department_df.alias('b'),
                on='dept_id',
                how='left',
            )
            .join(
                company_df.alias('c'),
                on=F.col('b.company_id') == F.col('c.company_id'),
                how='left',
            )
            .select(
                *[F.col(f'a.{c}') for c in employee_df.columns],
                F.when(
                    F.col('b.dept_id').isNotNull() & F.col('c.company_id').isNotNull(),
                    F.lit('POLITICS')
                ).alias('flag_department')
            )
        )
    

    The new employee_df will be:

    +---------+-------+---------------+
    |     Name|dept_id|flag_department|
    +---------+-------+---------------+
    |     John| dept_a|       POLITICS|
    |      Liù| dept_b|       POLITICS|
    |     Luke| dept_a|       POLITICS|
    |  Michail| dept_a|       POLITICS|
    |      Noe| dept_e|           null|
    |Shinchaku| dept_c|           null|
    |     Vlad| dept_e|           null|
    +---------+-------+---------------+