Search code examples
pythonapache-sparkpysparkuser-defined-functions

How to update pyspark dataframe inside a Python function


I have a Python function that receives a pyspark dataframe and checks if it has all the columns expected by other functions used in a script. In particular, if the column 'weight' is missing, I want to update the dataframe passed by the user by assigning a new column to it.

For example:

from pyspark.sql import functions as F

def verify_cols(df):
    if 'weight' not in df.columns:
        df = df.withColumn('weight', F.lit(1))  # Can I update `df` inside this function?

As you can see, I want the function to update df. How can I achieve this? If possible, I would like to avoid using a return statement.

This post is very similar but uses pandas' inplace argument.


Solution

  • To avoid the return statement you can declare a class and keep the df as a member field.

    from pyspark.sql import functions as F
    from pyspark.sql.DataFrame import DataFrame
    class Validator:
        def __init__(self, df: DataFrame):
            self.df = df
    
        def verify_cols(self):
            if 'weight' not in self.df.columns:
                self.df = self.df.withColumn('weight', F.lit(1))
    

    After calling verify_cols methods, the field df will be updated.