i try to join two tables by a list which was created from an other table. But i didn't find a way how it works.
Base would be:
data = [('B_YEAR, B_ORG', 'YEAR, ORG', 1), ('B_YEAR', 'YEAR', 2) ]
test = spark.createDataFrame(data, ['key_orig', 'key_map', 'Fil_Number'])
fc_year == 2024
Now i try to join:
for i in range (1,x):
test_1= test.filter(col("Fil_Number")==1)
list_key_orig = test_1.select("key_orig").collect()
list_key_map = test_1.select("key_map").collect()
df_calc_values =spark.read.table("hive_metastore.reporting_datalake.df_calc_fc_values")
df_calc_values = df_calc_values.filter((col("GJ")==fc_year))
display(df_calc_values)
df_new= df_orig.join(df_calc_values, on = (key_orig1.key_orig == key_map1.key_map) ,how = "left")
I already searched in google but no result.
Tables:
There are no insights where the original DataFrame came from. Also your for loop is not correct indented. So it is not feasible to recognize what is part of your loop and what not. your fc_year
variable is not declared. You are importing all namespacing from the functions module of pyspark by from pyspark.sql.functions import *
that you should absolutely not do to avoid naming conflicts and stuff like that in your code.
Anyway I tried to provide you an answer on how you can build a joining string out of your given information and how to join two DataFrames by this string.
from pyspark.sql import functions as f
df_calc_values =spark.read.table("hive_metastore.reporting_datalake.df_calc_fc_values")
df_orig = spark.read.table("original_table from wherever the table came from")
max_fil_no = test.agg(f.max("Fil_Number").alias("max")).first().max
fc_year = 2024
alias_original = "original"
alias_calc_val = "calc_vals"
dfs = {}
for i in range(1, max_fil_no+1):
test_1= test.filter(f.col("Fil_Number")==i)
list_key_orig = test_1.select("key_orig").collect()
list_key_map = test_1.select("key_map").collect()
join_cols_original = [x.replace(" ", "") for x in list_key_orig[0].key_orig.split(",")]
join_cols_calc_vals = [x.replace(" ", "") for x in list_key_map[0].key_map.split(",")]
join_str_combos = []
for key_orig, key_cal_vals in zip(join_cols_original, join_cols_calc_vals):
join_str_combos.append(f"{alias_original}.{key_orig} == {alias_calc_val}.{key_cal_vals}")
join_str = " AND ".join(join_str_combos)
df_calc_values = df_calc_values.filter((f.col("GJ")==fc_year))
df_new = (
df_orig.alias(alias_original)
.join(
df_calc_values.alias(alias_calc_val),
on=f.expr(join_str),
how="left"
)
)
dfs[i] = df_new