I would like to create a pyspark dataframe dynamically using lambda form a give dictionary. Below is my dictionary.
cols = [
{'Technical Label':'UserID', 'Column Mapping':'UID','Technical Column':'No'},
{'Technical Label':'Type', 'Column Mapping':'X_TYPE','Technical Column':'No'},
{'Technical Label':'Name.1', 'Column Mapping':'NaturalName','Technical Column':'No'},
{'Technical Label':'Address.1', 'Column Mapping':'HomeAddress','Technical Column':'Yes'},
{'Technical Label':'Identifier.1', 'Column Mapping':'Human','Technical Column':'Yes'},
{'Technical Label':'Identifier.1', 'Column Mapping':'EX_CO','Technical Column':'Yes'},
{'Technical Label':'Identifier.IdentifierValue.1', 'Column Mapping':'X_CODE','Technical Column':'No'}
]
I want something similar
refdf = df.withColumn('UserID', df.UID).withColumn('Type', df.X_TYPE).withColumn('Name.1', df.NaturalName).withColumn('Address.1', lit('Address')...so on
I want to make a decision based on the Technical Column
in the Dictionary. If the value is No then I want to derive the value from the dataframe and if the value is Yes then I want to lit
the value from the respective Column Mapping dictionary key.
I wrote the below code but got error.
from functools import reduce
from pyspark.sql.functions import col, lit
df = spark.read.format('csv').option('header','true').option('delimiter','|').load('/my/path/file.txt')
x = reduce(lambda df,colsm : df.withColumn(colsm['Technical Label'], (lambda x: x['Column Mapping'] if x['Technical Column'] == 'No' else lit(x['Column Mapping']) , cols)), cols, df)
display(x)
Error
PySparkTypeError: [NOT_COLUMN] Argument
col
should be a Column, got tuple.
Cannot figure out what is wrong.
The error occurs because you're passing a tuple as an argument to withColumn, but it expects a valid column expression. Try the following:
def create_column_expression(col_dict):
if col_dict['Technical Column'] == 'No':
return col(col_dict['Column Mapping']).alias(col_dict['Technical Label'])
else:
return lit(col_dict['Column Mapping']).alias(col_dict['Technical Label'])
# Apply the transformations
x = reduce(lambda df, col_dict: df.withColumn(col_dict['Technical Label'], create_column_expression(col_dict)), cols, df)