I have a PySpark dataframe(records_002_df) in below format and more columns to be added in future -
+-----------+-----------------+-------------+
|RECORD_TYPE| CLAIM_NUMBER|RECEIVED_DATE|
+-----------+-----------------+-------------+
| 002| 23E002113200| 08/30/2023|
| 002| 23P001125500| 05/30/2023|
| 002| 23E002114300| 01/30/2024|
| 002|20223124002830199| 12/31/2022|
| 002|20223124003270199| 12/31/2022|
| 002|20223493004410199| 12/31/2022|
I need to perform separate transformation on each column, For one of them i tried like below -
trans_df=records_002_df.withColumn('RECORD_TYPE',when(records_002_df['RECORD_TYPE'] == '002','In-Network').otherwise('Out-Of-Network'))
Above is giving me expected dataframe with required transformation on RECORD_TYPE field. But I have different transformations to be applied on other columns. Also want to keep the transformation logic in a separate module, so that the spark script should be generic. Please suggest some ideas how I can achieve this. Thanks !!
Put the transformations into a dictionary with
In the column expression the dataframe should not be referenced directly but the input data should be referenced by its column names.
from pyspark.sql import functions as F
transformations = {"RECORD_TYPE": F.when(F.col("RECORD_TYPE") == '002','In-Network')
.otherwise('Out-Of-Network'),
"NEXT_DAY": F.date_add(F.to_date("RECEIVED_DATE", "MM/dd/yyyy"), 1)
}
This dict can now be applied to the dataframe:
records_002_df.select([F.col(name) for name in df1.columns - transformations.keys()] # unchanged columns
+ [ transformation.alias(name) for (name,transformation) # transformed columns
in transformations.items()]) \
.show()
Result:
+-------------+-------------+-----------+----------+
|RECEIVED_DATE| CLAIM_NUMBER|RECORD_TYPE| NEXT_DAY|
+-------------+-------------+-----------+----------+
| 08/30/2023| 23E002113200| In-Network|2023-08-31|
| 05/30/2023|J23P001125500| In-Network|2023-05-31|
| 01/30/2024| 23E002114300| In-Network|2024-01-31|
+-------------+-------------+-----------+----------+
The transformations
dict does not have any references to the original dataframe, so its creation can be moved to another module.