Search code examples
databasedataframeapache-sparkpysparkapache-spark-sql

How to make a left join that the keys can have multiple granularity with Spark?


I have this sample table that represents the salary of all company's employees:

PLANT DEPTO NAME SALARIES
1 MG KEVIN 1.100
2 SP ETHAN 1.200
3 DF JUAN 1.200
4 SP BETH 110
5 SP JOHN 1.000

And also have this other table that represents the increase that need to be apply in the salaries:

INCREASE_ID PLANT DEPTO NAME FACTOR
IC1 1 MG 1.2
IC2 2 SP 1.4
IC3 3 DF 1.3
IC4 4 SP BETH 1.3
IC5 SP 1.02

Increases can be applied cumulatively and if one of the keys match. As an example, Ethan can get two types of raises combined:

  • IC2: Because PLANT = 2 and DEPTO = SP
  • IC5: Because DEPTO = SP

With this in mind, my goal is to get the following table when above two are combined:

PLANT DEPTO NAME VALUE LIST_OF_INCREASE_ID FACTOR_COMBINED
1 MG KEVIN 1.100 IC1 1,20
2 SP ETHAN 1.200 IC2 and IC5 1,43 (1,4 * 1,02)
3 MG JUAN 1.200 1,00
4 SP BETH 1.300 IC4 and IC5 1,33 (1,3 * 1,02)
5 SP JOHN 1.000 IC5 1,02

Does anyone have any suggestions for a method or join type that can "avoid" these null cases when I have a match in any of the key columns (PLANT, DEPTO or NAME)?


Solution

  • For future readers who have the same problem, I was able to solve it by following the conditional join idea described in this post.

    Basically, I created a variable that store the conditions that replace each null value with the corresponding one in the fact table, obtaining the following table:

    # make condition to join increase_df and fact_df
    
    condition = (
       (fn.coalesce(increase_df.PLANT, fact_df.PLANT) == fact_df.PLANT) &
       (fn.coalesce(increase_df.DEPTO, fact_df.DEPTO) == fact_df.DEPTO) &
       (fn.coalesce(increase_df.NAME, fact_df.NAME) == fact_df.NAME)
    )
    
    final_df = (
        fact_df
        .join(increase_df, condition, "left")
    )
    
    final_df.show()
    +-----+-----+-----+------+-----------+-----+-----+----+------+
    |PLANT|DEPTO| NAME| VALUE|INCREASE_ID|PLANT|DEPTO|NAME|FACTOR|
    +-----+-----+-----+------+-----------+-----+-----+----+------+
    |    1|   MG|KEVIN|1100.0|        IC1|    1|   MG|null|   1.2|
    |    2|   SP|ETHAN|1200.0|        IC5| null|   SP|null|  1.02|
    |    2|   SP|ETHAN|1200.0|        IC2|    2|   SP|null|   1.4|
    |    3|   DF| JUAN|1200.0|        IC3|    3|   DF|null|   1.3|
    |    4|   SP| BETH|1100.0|        IC5| null|   SP|null|  1.02|
    |    4|   SP| BETH|1100.0|        IC4|    4|   SP|BETH|   1.3|
    |    5|   SP| JOHN|1000.0|        IC5| null|   SP|null|  1.02|
    +-----+-----+-----+------+-----------+-----+-----+----+------+
    

    After that, I just made aggregations of the values:

    +-----+-----+-----+------+------+-----------+
    |PLANT|DEPTO| NAME| VALUE|FACTOR|INCREASE_ID|
    +-----+-----+-----+------+------+-----------+
    |    1|   MG|KEVIN|1100.0|   1.2|        IC1|
    |    2|   SP|ETHAN|1200.0| 1.428|IC5 and IC2|
    |    3|   DF| JUAN|1200.0|   1.3|        IC3|
    |    4|   SP| BETH|1100.0| 1.326|IC5 and IC4|
    |    5|   SP| JOHN|1000.0|  1.02|        IC5|
    +-----+-----+-----+------+------+-----------+