Search code examples
pyspark

How to pass parameters to functions using applyInPandas in pyspark?


I have a dataframe which consists of two columns. I am using a function as udf and running that function using applyInPandas in pyspark.

Below is the code

import pandas as pd  
from pyspark.sql.functions import pandas_udf, ceil
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").applyInPandas(
    normalize, schema="id long, v double").show() 

I have to pass one more parameter to the normalize function which acts as a udf. When I am passing a parameter I am getting an error

below is the code for that

import pandas as pd  
from pyspark.sql.functions import pandas_udf, ceil
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
def normalize(pdf,value):
    v = pdf.v - value
    return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").applyInPandas(
    normalize(1), schema="id long, v double").show()

Error:

TypeError: normalize() missing 1 required positional argument: 'value'

How to resolve this and pass parameters to the udf using applyInPandas?


Solution

  • Wrap your normalize function like this:

    def normalize_wrap(value):
        def normalize(pdf):
            v = pdf.v - value
            return pdf.assign(v=(v - v.mean()) / v.std())
        return normalize
    

    Result:

    +---+-------------------+
    | id|                  v|
    +---+-------------------+
    |  1|-0.7071067811865475|
    |  1| 0.7071067811865475|
    |  2|-0.8320502943378437|
    |  2|-0.2773500981126146|
    |  2| 1.1094003924504583|
    +---+-------------------+