I have a SQL query which I am trying to transform into PySpark which have some join
s and multiple where
conditions:
UPDATE COMPANY1
INNER JOIN COMPANY2
ON COMPANY1.C1_PROFIT = COMPANY2.C2_PROFIT
SET COMPANY2.C2_TARGET = "1"
WHERE (((COMPANY2.C2_TARGET) Is Null)
AND ((COMPANY1.C1_SALES) Is Null)
AND ((COMPANY2.C2_PROFIT) Is Not Null));
PySpark query I am trying to execute (df_1
->COMPANY2
& df_2
->Company1
):
join = ((df_1.C2_PROFIT == df_2.C1_PROFIT) & \
(df_1.C2_TARGET=='') & \
(df_2.C1_SALES=='') & \
(df_1.C2_PROFIT!=''))
df_1 = (df_1.alias('a')
.join(df_2.alias('b'), join, 'left')
.select(
*[c for c in df_1.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
)
)
But I am still getting null
values in column "C2_TARGET".
For the information: column "C1_Profit" is null-free, but in "C2_Profit" we sometimes have null
as well as values.
Example inputs:
+------------------+--------------+
| C1_PROFIT |C1_SALES |
+------------------+--------------+
|5637 | Positive |
|7464 | |
|43645 | |
|64657 | Growth P|
+------------------+--------------+
+------------------+--------------+
| C2_PROFIT |C2_TARGET |
+------------------+--------------+
| | |
|7464 | |
|43645 | |
|64657 | |
+------------------+--------------+
Expected result:
join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \ --JOIN CONDITION
(df_1.C1_REVENUE == df_3.C3_REVENUE_BREAK) & \ --JOIN CONDITION
(df_1.C1_LOSS == df_4.C4_TOTAL_LOSS) & \ --JOIN CONDITION
((df_4.MARGIN_OF_COMPANY) > (df_3.LAST_YEAR_MARGIN)) --WHERE CONDITION
df = (df_1.alias('a')
.join(df_2.alias('b'), join_on, 'left')
.join(df_3.alias('c'), join_on, 'left')
.join(df_4.alias('c'), join_on. 'left')
.select(
*[c for c in df_2.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C2_PROFIT, '1', a.C2_TARGET) C2_TARGET")
In this answer, you have an example of how to do
UPDATE A INNER JOIN B
...
SET A...
In your case, you SET B...
:
UPDATE A INNER JOIN B
...
SET B...
You have correctly switched the order of your dataframes.
What's not correct is that ''
is not the same as null
. You must use .isNull()
and .isNotNull()
in your conditions.
Example inputs:
from pyspark.sql import functions as F
df_1 = spark.createDataFrame(
[(5637, 'Positive'),
(7464, None),
(43645, None),
(64657, 'Growth P')],
['C1_PROFIT', 'C1_SALES'])
df_2 = spark.createDataFrame(
[(None, None),
(7464, None),
(43645, None),
(64657, None)],
'C2_PROFIT int, C2_TARGET string')
Script:
join_on = (df_1.C1_PROFIT == df_2.C2_PROFIT) & \
df_2.C2_TARGET.isNull() & \
df_1.C1_SALES.isNull() & \
df_2.C2_PROFIT.isNotNull()
df = (df_2.alias('a')
.join(df_1.alias('b'), join_on, 'left')
.select(
*[c for c in df_2.columns if c != 'C2_TARGET'],
F.expr("nvl2(b.C1_PROFIT, '1', a.C2_TARGET) C2_TARGET")
)
)
df.show()
# +---------+---------+
# |C2_PROFIT|C2_TARGET|
# +---------+---------+
# | null| null|
# | 7464| 1|
# | 64657| null|
# | 43645| 1|
# +---------+---------+