Search code examples
pysparkuser-defined-functionsdata-cleaningpyspark-pandas

How to create this function in PySpark?


I have a large data frame, consisting of 400+ columns and 14000+ records, that I need to clean. I have defined a python code to do this, but due to the size of my dataset, I need to use PySpark to clean it. However, I am very unfamiliar with PySpark and don't know how I would create the python function in PySpark.

This is the function in python:

unwanted_characters = ['[', ',', '-', '#', '@', ' '] 
cols = df.columns.to_list()

def clean_col(item):
    column= str(item.loc[col])           
    
    for character in unwanted_characters:
        if character in column:
            character_index = column.find(character)
            column = column[:character_index]
    
    return column


for x in cols:
  df[x] = lrndf.apply(clean_col, axis=1) 

This function works in python but I cannot apply it to 400+ columns.

I have tried to convert this funtion to pyspark:

clean_colUDF = udf(lambda z: clean_col(z))

df.select(col("Name"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

But when I run it I get the error: AttributeError: 'str' object has no attribute 'loc'

Does anyone know how I would modify this so that it works in pyspark? My columns datatypes are both integers and strings so I need it to work on both.


Solution

  • Use built-in pyspark.sql.functions wherever possible as they provide a ready-made performant toolkit which should be able to cover 95% of any data transformation requirement without having to implement your own custom UDF's

    pyspark.sql.functions docs: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

    For what you want to do I would start with regex_replace() https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.regexp_replace.html#pyspark.sql.functions.regexp_replace