Search code examples
listjoinpyspark

Join two tables by columnname when columnames for joining stored in a table


i try to join two tables by a list which was created from an other table. But i didn't find a way how it works.

Base would be:

data = [('B_YEAR, B_ORG', 'YEAR, ORG', 1), ('B_YEAR', 'YEAR', 2) ]
test = spark.createDataFrame(data, ['key_orig', 'key_map', 'Fil_Number']) 
fc_year == 2024

Now i try to join:

for i in range (1,x):
test_1= test.filter(col("Fil_Number")==1)
list_key_orig = test_1.select("key_orig").collect()
list_key_map = test_1.select("key_map").collect()

df_calc_values =spark.read.table("hive_metastore.reporting_datalake.df_calc_fc_values")

df_calc_values = df_calc_values.filter((col("GJ")==fc_year))
display(df_calc_values)

df_new= df_orig.join(df_calc_values, on = (key_orig1.key_orig == key_map1.key_map) ,how = "left")

I already searched in google but no result.

Tables:

Table - df_calc_values enter image description here

Table - df_orig enter image description here


Solution

  • There are no insights where the original DataFrame came from. Also your for loop is not correct indented. So it is not feasible to recognize what is part of your loop and what not. your fc_year variable is not declared. You are importing all namespacing from the functions module of pyspark by from pyspark.sql.functions import * that you should absolutely not do to avoid naming conflicts and stuff like that in your code.

    Anyway I tried to provide you an answer on how you can build a joining string out of your given information and how to join two DataFrames by this string.

    from pyspark.sql import functions as f
    
    df_calc_values =spark.read.table("hive_metastore.reporting_datalake.df_calc_fc_values")
    df_orig = spark.read.table("original_table from wherever the table came from")
    max_fil_no = test.agg(f.max("Fil_Number").alias("max")).first().max
    fc_year = 2024
    alias_original = "original"
    alias_calc_val = "calc_vals"
    
    dfs = {}
    
    for i in range(1, max_fil_no+1):
        test_1= test.filter(f.col("Fil_Number")==i)
        list_key_orig = test_1.select("key_orig").collect()
        list_key_map = test_1.select("key_map").collect()
    
        join_cols_original = [x.replace(" ", "") for x in list_key_orig[0].key_orig.split(",")]
        join_cols_calc_vals = [x.replace(" ", "") for x in list_key_map[0].key_map.split(",")]
        
        join_str_combos = []
        for key_orig, key_cal_vals in zip(join_cols_original, join_cols_calc_vals):
            join_str_combos.append(f"{alias_original}.{key_orig} == {alias_calc_val}.{key_cal_vals}")
    
        join_str = " AND ".join(join_str_combos)
    
        df_calc_values = df_calc_values.filter((f.col("GJ")==fc_year))
    
        df_new = (
            df_orig.alias(alias_original)
            .join(
                df_calc_values.alias(alias_calc_val),
                on=f.expr(join_str),
                how="left"
            )
        )
        dfs[i] = df_new