Search code examples
apache-sparkpyspark

Apply different transformation on each dataframe column


I have a PySpark dataframe(records_002_df) in below format and more columns to be added in future -

+-----------+-----------------+-------------+
|RECORD_TYPE|     CLAIM_NUMBER|RECEIVED_DATE|
+-----------+-----------------+-------------+
|        002|     23E002113200|   08/30/2023|
|        002|     23P001125500|   05/30/2023|    
|        002|     23E002114300|   01/30/2024|    
|        002|20223124002830199|   12/31/2022|    
|        002|20223124003270199|   12/31/2022|    
|        002|20223493004410199|   12/31/2022|

I need to perform separate transformation on each column, For one of them i tried like below -

trans_df=records_002_df.withColumn('RECORD_TYPE',when(records_002_df['RECORD_TYPE'] == '002','In-Network').otherwise('Out-Of-Network'))

Above is giving me expected dataframe with required transformation on RECORD_TYPE field. But I have different transformations to be applied on other columns. Also want to keep the transformation logic in a separate module, so that the spark script should be generic. Please suggest some ideas how I can achieve this. Thanks !!


Solution

  • Put the transformations into a dictionary with

    • key: the wanted column name
    • value: a Column expression that contains the transformation.

    In the column expression the dataframe should not be referenced directly but the input data should be referenced by its column names.

    from pyspark.sql import functions as F
    
    transformations = {"RECORD_TYPE": F.when(F.col("RECORD_TYPE") == '002','In-Network')
                                                              .otherwise('Out-Of-Network'),
                       "NEXT_DAY": F.date_add(F.to_date("RECEIVED_DATE", "MM/dd/yyyy"), 1)
    }
    

    This dict can now be applied to the dataframe:

    records_002_df.select([F.col(name) for name in df1.columns - transformations.keys()] # unchanged columns
               + [ transformation.alias(name) for (name,transformation) # transformed columns
                  in transformations.items()]) \
             .show()
    

    Result:

    +-------------+-------------+-----------+----------+
    |RECEIVED_DATE| CLAIM_NUMBER|RECORD_TYPE|  NEXT_DAY|
    +-------------+-------------+-----------+----------+
    |   08/30/2023| 23E002113200| In-Network|2023-08-31|
    |   05/30/2023|J23P001125500| In-Network|2023-05-31|
    |   01/30/2024| 23E002114300| In-Network|2024-01-31|
    +-------------+-------------+-----------+----------+
    

    The transformations dict does not have any references to the original dataframe, so its creation can be moved to another module.