Search code examples
dataframeapache-sparkpysparkapache-spark-sqlmultiple-columns

Execute same function on different columns to make rows to append another table


How could I perform the same operation for 15 columns on a DataFrame?
How could I parallelize the operation?

I have an input data that I need to update a reference table. There are more columns but I think these 3 help to understand what I am trying to do.

Table: input

rowid col1 col2 col3
id1 col1_data1 col2_data1 col3_data1
id2 col1_data2 col2_data2 col3_data2

The reference table contains the values of each corresponding cell of the column, then the md5 and finally the column name

Table: references

col_data md5 ref_name
col1_data1 md5_col1_data1 col1_name
col1_data2 md5_col1_data2 col1_name
col1_data3 md5_col1_data3 col1_name
col2_data1 md5_col2_data1 col2_name
col2_data2 md5_col2_data2 col2_name
col2_data3 md5_col2_data3 col2_name
col3_data1 md5_col3_data1 col3_name
col3_data2 md5_col3_data2 col3_name
col3_data3 md5_col3_data3 col3_name

I created a function similar to this that checks the input table against the reference table and when new data is found then the reference is created and a dataframe is returned so that at the end the references table is updated

def repeatedly_excuted_funcion(input_data, references, col_name):
    """ 
    input_data is the full dataframe
    references is the table to check if has the value and if not create it
    col_name is the name of the column that will be considered on the execution
    """
    # ... some code ... 
    return partial_df

df_col1 = repeatedly_excuted_funcion(input_data, references, "col1")    
df_col2 = repeatedly_excuted_funcion(input_data, references, "col2")    
data_to_append = df_col1.union(df_col2)
df_col3 = repeatedly_excuted_funcion(input_data, references, "col3")
data_to_append = data_to_append.union(df_col2) 

I only put a 3 column example but there are 15 columns to check.

At the end the idea is to update the references table with the new calculated md5 values.

(
     data_to_append.write.format("delta")
     .mode("append")
     .saveAsTable(database_table)
)

Solution

  • No function, no unions. 1 shuffle (anti join).

    • Create all the 3 final columns (data, md5, col_name) inside the array in Input table
    • Unpivot - from every 1 row of 15 cols make 1 col of 15 rows
    • Split the 1 array col into 3 data cols
    • Filter out rows which already exist in References
    • Append result
    from pyspark.sql import functions as F
    
    cols = ['col1', 'col2',..., 'col15']
    
    # Change Input columns to arrays
    df_input = df_input.select(
        *[F.array(F.col(c), F.md5(c), F.lit(c)).alias(c) for c in cols]
    )
    # Unpivot Input table
    stack_string = ", ".join([f"`{c}`" for c in cols])
    df_input2 = df_input.select(
        F.expr(f"stack({len(cols)}, {stack_string}) as col_data"))
    
    # Make 3 columns from 1 array column
    df_input3 = df_input2.select(
        F.element_at('col_data', 1).alias('col_data'),
        F.element_at('col_data', 2).alias('md5'),
        F.element_at('col_data', 3).alias('ref_name'),
    )
    
    # Keep only rows which don't exist in References table
    data_to_append = df_input3.join(df_references, 'col_data', 'anti')
    
    (
        data_to_append.write.format("delta")
        .mode("append")
        .saveAsTable(database_table)
    )