Search code examples
apache-sparkpysparkapache-spark-sql

Join two DFs on column name as key


I have a DF from df.summary()

+-------+-----------+-----------+
|summary|    col1   |    col2   |
+-------+-----------+-----------+
|  count|       1000|       1000|
|   mean|  45.678923|  67.890123|
| stddev|   7.123456|   9.234567|
|    min|      32.45|      54.23|
|    25%|      40.12|      63.45|
|    50%|      45.67|      67.89|
|    75%|      50.23|      72.34|
|    max|      58.90|      87.65|
+-------+-----------+-----------+

Althought, I have a secondary DF with some custom metrics:

distinct_col1, complete_col1, distinct_col2, complete_col2, distinct_col3, complete_col3, max_col3, min_col3
989, 1000, 1000, 1000, 540, 1000, ‘2023-10-01’, ‘2021-01-01’

My doubt is, how can I join both DFs to get the following output:

+-------+-----------+-----------+-----------+
|summary|    col1   |    col2   |   col3    |
+-------+-----------+-----------+-----------+
|  count|       1000|       1000|       1000|
| dstnct|        989|       1000|        540|
| complt|       1000|       1000|       1000|
|   mean|  45.678923|  67.890123|           |
| stddev|   7.123456|   9.234567|           |
|    min|      32.45|      54.23|2021-01-01 | 
|    25%|      40.12|      63.45|           |
|    50%|      45.67|      67.89|           |
|    75%|      50.23|      72.34|           |
|    max|      58.90|      87.65|2023-10-01 |
+-------+-----------+-----------+-----------+

I’ve tried some queries using spark.sql with DESCRIBE but with no success.


Solution

  • One possibility would be to unpivot your secondary df on each group of columns belonging to col1, col2, and col3 individually. Note that when you unpivot, all of the value columns must be the same type (because they will all end up in the same column), which is why I have a try block in the loop to convert the dates and numbers from col3 to strings.

    # additional imports you may want to have
    import pyspark.sql.functions as F
    from pyspark.sql.utils import AnalysisException
    from functools import reduce
    
    df_custom_list = []
    
    for col_name in ['col1','col2','col3']:
        selected_cols = [col for col in df_custom.columns if col_name in col]
        summary_variables = [full_col_name.split(f'_{col_name}')[0] for full_col_name in selected_cols]
        
        try:
            df_custom_list.append(
                df_custom.select(
                    selected_cols
                ).unpivot(
                    ids=[], values=selected_cols, variableColumnName="variable", valueColumnName=col_name
                ).withColumn(
                    'summary', F.split('variable','_').getItem(0)
                ).select(
                    'summary',
                    col_name
                )
            )
    
        ## AnalysisException when a column has mixed types
        except AnalysisException:
            df_custom_list.append(
                df_custom.select(
                    [F.col(c).cast("string") for c in selected_cols]
                ).unpivot(
                    ids=[], values=selected_cols, variableColumnName="variable", valueColumnName=col_name
                ).withColumn(
                    'summary', F.split('variable','_').getItem(0)
                ).select(
                    'summary',
                    col_name
                )
            )
    
    def join_reduce(left,right):
        return left.join(right,on='summary',how='outer')
    
    df_custom_long = reduce(join_reduce, df_custom_list)
    

    This gives us the following "long" version of your secondary df with custom metrics:

    +--------+------+------+----------+
    | summary|  col1|  col2|      col3|
    +--------+------+------+----------+
    |complete|1000.0|1000.0|    1000.0|
    |distinct| 989.0|1000.0|     540.0|
    |     max|  NULL|  NULL|2023-10-01|
    |     min|  NULL|  NULL|2021-01-01|
    +--------+------+------+----------+
    

    Then we can join this back to the summary dataframe, and use coalesce to get all non-null entries of col1 and col2:

    df_summary.join(
        df_custom_long, on=['summary'], how='outer'
    ).select(
        'summary',
        F.coalesce('col1','col1_custom').alias('col1'),
        F.coalesce('col2','col2_custom').alias('col2'),
        F.col('col3_custom').alias('col3')
    )
    
    +--------+--------------------+--------------------+----------+
    | summary|                col1|                col2|      col3|
    +--------+--------------------+--------------------+----------+
    |     25%|  0.2322126951468173| 0.24069271183250496|      NULL|
    |     50%| 0.49636624720123634|  0.5184080219452041|      NULL|
    |     75%|   0.743946292572677|   0.760289823122135|      NULL|
    |complete|              1000.0|              1000.0|    1000.0|
    |   count|                1000|                1000|      NULL|
    |distinct|               989.0|              1000.0|     540.0|
    |     max|  0.9997176732861306|  0.9994137257706666|2023-10-01|
    |    mean|  0.4902565533201336|  0.5070173051792036|      NULL|
    |     min|0.004632023004602859|0.003218263604278...|2021-01-01|
    |  stddev|  0.2921373619474861|  0.2921898934031558|      NULL|
    +--------+--------------------+--------------------+----------+
    

    Depending on your use case, you may need to redefine the types in your schema because due to the mixed data types and casting dates and floats to both be strings, you may end up with all columns as strings.