I have a dataframe which consists of two columns. I am using a function as udf and running that function using applyInPandas in pyspark.
Below is the code
import pandas as pd
from pyspark.sql.functions import pandas_udf, ceil
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def normalize(pdf):
v = pdf.v
return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").applyInPandas(
normalize, schema="id long, v double").show()
I have to pass one more parameter to the normalize function which acts as a udf. When I am passing a parameter I am getting an error
below is the code for that
import pandas as pd
from pyspark.sql.functions import pandas_udf, ceil
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def normalize(pdf,value):
v = pdf.v - value
return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").applyInPandas(
normalize(1), schema="id long, v double").show()
Error:
TypeError: normalize() missing 1 required positional argument: 'value'
How to resolve this and pass parameters to the udf using applyInPandas?
Wrap your normalize
function like this:
def normalize_wrap(value):
def normalize(pdf):
v = pdf.v - value
return pdf.assign(v=(v - v.mean()) / v.std())
return normalize
Result:
+---+-------------------+
| id| v|
+---+-------------------+
| 1|-0.7071067811865475|
| 1| 0.7071067811865475|
| 2|-0.8320502943378437|
| 2|-0.2773500981126146|
| 2| 1.1094003924504583|
+---+-------------------+