Search code examples
azureapache-sparkpysparkazure-databricks

How do i do more than 2 or more factor joins?


test_policies_rl_data = [
    (1, 'A', 0.5, 0.8),
    (2, 'B', 0.6, 0.9),
    (3, 'C', 0.7, 1.0)
]
rates_table_data = [
    (1, 'A', 0.5, 0.8, null, 10, 0.5, 1, 100),
    (2, 'B', 0.6, 0.9, null, 20, 0.6, 2, 200),
    (3, 'C', 0.7, 1.0, null, 30, 0.7, 3, 300)
]
test_policies_rl = spark.createDataFrame(test_policies_rl_data, ['version', 'sub_class', 'rf_1', 'rl_1'])
rates_table = spark.createDataFrame(rates_table_data, ['version', 'cover', 'RF', 'rating_factor_1', 'rating_factor_2',
                                                       'rating_factor_amount', 'rating_factor_coefficient', 'rating_factor_level', 'rating_factor_rate'])
test_policies_rl.createOrReplaceTempView("test_policies_rl")
rates_table.createOrReplaceTempView("rates_table")
num_columns = 3
base_query = """
SELECT 
    t.*,
    r.rating_factor_amount AS rating_factor_amount_{},
    r.rating_factor_coefficient AS rating_factor_coefficient_{},
    r.rating_factor_level AS rating_factor_level_{},
    r.rating_factor_rate AS rating_factor_rate_{}
FROM test_policies_rl t
LEFT JOIN rates_table r
ON t.version = r.version
AND t.sub_class = r.cover
AND t.rf_1 = r.RF
AND t.rl_1 = r.rating_factor_1
"""
for i in range(1, num_columns + 1):
    query = base_query.format(i, i, i, i)
    policies_coeff = spark.sql(query)
    policies_coeff.show()

what if i have more than 1 condition to join with for eahc rating factor in test_policies_rl_data.

firstly, rates_table is a table of tables where i can access each rating factor rates using:

rates_table['rating_factor_1'] rates_table['rating_factor_2']

so when accessing these tables,

only rating_factor_1 column in rates table is filled.But what if rating_factor_2 column is also filled and i will have to match it with one of the columns in test_policies_rl_data to get the correct rating_factor_amount, coefficient and rates?

Example:

test_policies_rl_data = [
    (1, 'A','private', 0.5, 0.8),
    (2, 'B', 'business',0.6, 0.9),
    (3, 'C', 'private',0.7, 1.0)
]

rates_table_data = [
    (1, 'A', 0.5, 0.8, 'private', 10, 0.5, 1, 100),
    (2, 'B', 0.6, 0.9, 'business', 20, 0.6, 2, 200),
    (3, 'C', 0.7, 1.0, 'private', 30, 0.7, 3, 300),
    (3, 'C', 0.7, 1.0, 'business', 30, 0.7, 3, 300)
]

test_policies_rl = spark.createDataFrame(test_policies_rl_data, ['version', 'sub_class', 'use,'rf_1', 'rl_1'])
rates_table = spark.createDataFrame(rates_table_data, ['version', 'cover', 'RF', 'rating_factor_1', 'rating_factor_2',
                                                       'rating_factor_amount', 'rating_factor_coefficient', 'rating_factor_level', 'rating_factor_rate'])
test_policies_rl.createOrReplaceTempView("test_policies_rl")
rates_table.createOrReplaceTempView("rates_table")

so in this case,

rating_factor_2 in rates_table should match the 'use' column in test_policies as well. I am not sure how to make the code identify a 2 factor join is going to happen? Because sometimes it could be a 1 factor join sometimes it could be 2.


Solution

  • I have tried the below as an example.

    test_policies_rl_data = [
        (1, 'A', 'private', 0.5, 0.8),
        (2, 'B', 'business', 0.6, 0.9),
        (3, 'C', 'private', 0.7, 1.0)
    ]
    rates_table_data = [
        (1, 'A', 0.5, 0.8, 'private', 10, 0.5, 1, 100),
        (2, 'B', 0.6, 0.9, 'business', 20, 0.6, 2, 200),
        (3, 'C', 0.7, 1.0, 'private', 30, 0.7, 3, 300),
        (3, 'C', 0.7, 1.0, 'business', 30, 0.7, 3, 300)
    ]
    test_policies_rl = spark.createDataFrame(test_policies_rl_data, ['version', 'sub_class', 'use', 'rf_1', 'rl_1'])
    rates_table = spark.createDataFrame(rates_table_data, ['version', 'cover', 'RF', 'rating_factor_1', 'rating_factor_2',
                                                           'rating_factor_amount', 'rating_factor_coefficient', 'rating_factor_level', 'rating_factor_rate'])
    test_policies_rl.createOrReplaceTempView("test_policies_rl")
    rates_table.createOrReplaceTempView("rates_table")
    num_columns = 3
    base_query = """
    SELECT 
        t.*,
        r.rating_factor_amount AS rating_factor_amount_{},
        r.rating_factor_coefficient AS rating_factor_coefficient_{},
        r.rating_factor_level AS rating_factor_level_{},
        r.rating_factor_rate AS rating_factor_rate_{}
    FROM test_policies_rl t
    LEFT JOIN rates_table r
    ON t.version = r.version
    AND t.sub_class = r.cover
    AND t.rf_1 = r.RF
    AND t.rl_1 = r.rating_factor_1
    AND (t.use = r.rating_factor_2 OR (t.use = 'business' AND r.rating_factor_2 IS NULL))
    """
    for i in range(1, num_columns + 1):
        query = base_query.format(i, i, i, i)
        policies_coeff = spark.sql(query)
        policies_coeff.show()
    

    enter image description here enter image description here

    -I added an additional condition for the join, specifically for the use and rating_factor_2 columns.

    This condition checks if the values in these columns match, or if rating_factor_2 is null, it checks if the use column is 'business'.

    • The query will be able to handle both one-factor and two-factor joins for the given conditions.

    • This way,it will create the DataFrames, Register them as temporary views And then perform the left join operation on the DataFrames Using the query will output the results for each iteration of the loop, showing the matched rows based on the multiple conditions specified in the join.

    • The query will be able to handle both one-factor and two-factor joins for the given conditions. Adjust the logic within the parentheses to suit your specific data requirements.