How could I perform the same operation for 15 columns on a DataFrame?
How could I parallelize the operation?
I have an input data that I need to update a reference table. There are more columns but I think these 3 help to understand what I am trying to do.
Table: input
rowid | col1 | col2 | col3 |
---|---|---|---|
id1 | col1_data1 | col2_data1 | col3_data1 |
id2 | col1_data2 | col2_data2 | col3_data2 |
The reference table contains the values of each corresponding cell of the column, then the md5 and finally the column name
Table: references
col_data | md5 | ref_name |
---|---|---|
col1_data1 | md5_col1_data1 | col1_name |
col1_data2 | md5_col1_data2 | col1_name |
col1_data3 | md5_col1_data3 | col1_name |
col2_data1 | md5_col2_data1 | col2_name |
col2_data2 | md5_col2_data2 | col2_name |
col2_data3 | md5_col2_data3 | col2_name |
col3_data1 | md5_col3_data1 | col3_name |
col3_data2 | md5_col3_data2 | col3_name |
col3_data3 | md5_col3_data3 | col3_name |
I created a function similar to this that checks the input table against the reference table and when new data is found then the reference is created and a dataframe is returned so that at the end the references table is updated
def repeatedly_excuted_funcion(input_data, references, col_name):
"""
input_data is the full dataframe
references is the table to check if has the value and if not create it
col_name is the name of the column that will be considered on the execution
"""
# ... some code ...
return partial_df
df_col1 = repeatedly_excuted_funcion(input_data, references, "col1")
df_col2 = repeatedly_excuted_funcion(input_data, references, "col2")
data_to_append = df_col1.union(df_col2)
df_col3 = repeatedly_excuted_funcion(input_data, references, "col3")
data_to_append = data_to_append.union(df_col2)
I only put a 3 column example but there are 15 columns to check.
At the end the idea is to update the references table with the new calculated md5 values.
(
data_to_append.write.format("delta")
.mode("append")
.saveAsTable(database_table)
)
No function, no unions. 1 shuffle (anti join).
from pyspark.sql import functions as F
cols = ['col1', 'col2',..., 'col15']
# Change Input columns to arrays
df_input = df_input.select(
*[F.array(F.col(c), F.md5(c), F.lit(c)).alias(c) for c in cols]
)
# Unpivot Input table
stack_string = ", ".join([f"`{c}`" for c in cols])
df_input2 = df_input.select(
F.expr(f"stack({len(cols)}, {stack_string}) as col_data"))
# Make 3 columns from 1 array column
df_input3 = df_input2.select(
F.element_at('col_data', 1).alias('col_data'),
F.element_at('col_data', 2).alias('md5'),
F.element_at('col_data', 3).alias('ref_name'),
)
# Keep only rows which don't exist in References table
data_to_append = df_input3.join(df_references, 'col_data', 'anti')
(
data_to_append.write.format("delta")
.mode("append")
.saveAsTable(database_table)
)