I am having trouble with creating a Pandas UDF that performs a calculation on a pd Series based on a value in the same row of the underlying Spark Dataframe.
However, the most straight forward solution doesn't seem to be supported by the Pandas on Spark API:
A very simple example like below
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
import pandas as pd
@F.pandas_udf(IntegerType())
def addition(arr: pd.Series, addition: int) -> pd.Series:
return arr.add(addition)
df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])
df.show()
df.withColumn("added", addition(F.col("array"),F.col("addition")))
throws the following exception on the udf definition line
NotImplementedError: Unsupported signature: (arr: pandas.core.series.Series, addition: int) -> pandas.core.series.Series.
Am i tackling this problem in a wrong way? I could reimplement the whole "addition" function in native PySpark, but the real function I am talking about is terribly complex and would mean an enormous amount of rework.
Loading the example, adding import array
from pyspark.sql.types as T
import pyspark.sql.functions as F
import pandas as pd
from array import array
df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])
df.show(truncate=False)
print(df.schema.fields)
The response is,
+---------+--------+
| array|addition|
+---------+--------+
|[1, 2, 3]| 10|
|[4, 5, 6]| 20|
+---------+--------+
[StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True)]
If you must use a Pandas function to complete your task here is an option for a solution that uses a Pandas function within a PySpark UDF,
arr
column is ArrayType, convert it into a Pandas Series@F.udf(T.ArrayType(T.LongType()))
def addition_pd(arr, addition):
pd_arr = pd.Series(arr)
added = pd_arr.add(addition)
return array("l", added)
df = df.withColumn("added", addition_pd(F.col("array"),F.col("addition")))
df.show(truncate=False)
print(df.schema.fields)
Returns
+---------+--------+------------+
|array |addition|added |
+---------+--------+------------+
|[1, 2, 3]|10 |[11, 12, 13]|
|[4, 5, 6]|20 |[24, 25, 26]|
+---------+--------+------------+
[StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True), StructField('added', ArrayType(LongType(), True), True)]
However, it is worth stating that when possible it is recommended to use PySpark Functions over the use of PySpark UDF (see here)