Search code examples
sqljoinpysparkapache-spark-sqlsql-update

UPDATE a column value using JOIN and multiple WHERE conditions in PySpark


I have a SQL query which I am trying to transform into PySpark which have some joins and multiple where conditions:

UPDATE COMPANY1
INNER JOIN COMPANY2
ON COMPANY1.C1_PROFIT = COMPANY2.C2_PROFIT 
SET COMPANY2.C2_TARGET = "1"
WHERE (((COMPANY2.C2_TARGET) Is Null)
  AND ((COMPANY1.C1_SALES) Is Null)
  AND ((COMPANY2.C2_PROFIT) Is Not Null));

PySpark query I am trying to execute (df_1->COMPANY2 & df_2->Company1):

join = ((df_1.C2_PROFIT == df_2.C1_PROFIT) & \
  (df_1.C2_TARGET=='') & \
  (df_2.C1_SALES=='') & \
  (df_1.C2_PROFIT!=''))
df_1 = (df_1.alias('a')
  .join(df_2.alias('b'), join, 'left')
  .select(
    *[c for c in df_1.columns if c != 'C2_TARGET'],
    F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
  )
)

But I am still getting null values in column "C2_TARGET".

For the information: column "C1_Profit" is null-free, but in "C2_Profit" we sometimes have null as well as values.

Example inputs:

+------------------+--------------+
|  C1_PROFIT       |C1_SALES      |
+------------------+--------------+
|5637              |     Positive |
|7464              |              |
|43645             |              |
|64657             |      Growth P|
+------------------+--------------+

+------------------+--------------+
|  C2_PROFIT       |C2_TARGET     |
+------------------+--------------+
|                  |              |
|7464              |              |
|43645             |              |
|64657             |              |
+------------------+--------------+

Expected result:

enter image description here

join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \   --JOIN CONDITION
            (df_1.C1_REVENUE == df_3.C3_REVENUE_BREAK) & \  --JOIN CONDITION
            (df_1.C1_LOSS == df_4.C4_TOTAL_LOSS) & \        --JOIN CONDITION
            ((df_4.MARGIN_OF_COMPANY) > (df_3.LAST_YEAR_MARGIN))   --WHERE CONDITION
df = (df_1.alias('a')
    .join(df_2.alias('b'), join_on, 'left')
    .join(df_3.alias('c'), join_on, 'left')
    .join(df_4.alias('c'), join_on. 'left')
    .select(
    *[c for c in df_2.columns if c != 'C2_TARGET'],
    F.expr("nvl2(b.C2_PROFIT, '1', a.C2_TARGET) C2_TARGET")

Solution

  • In this answer, you have an example of how to do

    UPDATE A INNER JOIN B
    ...
    SET A...
    

    In your case, you SET B...:

    UPDATE A INNER JOIN B
    ...
    SET B...
    

    You have correctly switched the order of your dataframes.

    What's not correct is that '' is not the same as null. You must use .isNull() and .isNotNull() in your conditions.


    Example inputs:

    from pyspark.sql import functions as F
    df_1 = spark.createDataFrame(
        [(5637, 'Positive'),
         (7464, None),
         (43645, None),
         (64657, 'Growth P')],
        ['C1_PROFIT', 'C1_SALES'])
    
    df_2 = spark.createDataFrame(
        [(None, None),
         (7464, None),
         (43645, None),
         (64657, None)],
        'C2_PROFIT int, C2_TARGET string')
    

    Script:

    join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \
              df_2.C2_TARGET.isNull() & \
              df_1.C1_SALES.isNull() & \
              df_2.C2_PROFIT.isNotNull()
    df = (df_2.alias('a')
        .join(df_1.alias('b'), join_on, 'left')
        .select(
            *[c for c in df_2.columns if c != 'C2_TARGET'],
            F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
        )
    )
    
    df.show()
    # +---------+---------+
    # |C2_PROFIT|C2_TARGET|
    # +---------+---------+
    # |     null|     null|
    # |     7464|        1|
    # |    64657|     null|
    # |    43645|        1|
    # +---------+---------+