Search code examples
pandaspysparkpyspark-pandas

pandas_udf with pd.Series and other object as arguments


I am having trouble with creating a Pandas UDF that performs a calculation on a pd Series based on a value in the same row of the underlying Spark Dataframe.

However, the most straight forward solution doesn't seem to be supported by the Pandas on Spark API:

A very simple example like below

from pyspark.sql.types import IntegerType

import pyspark.sql.functions as F
import pandas as pd

@F.pandas_udf(IntegerType())
def addition(arr: pd.Series, addition: int) -> pd.Series:
  return arr.add(addition)

df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])
df.show()

df.withColumn("added", addition(F.col("array"),F.col("addition")))

throws the following exception on the udf definition line

NotImplementedError: Unsupported signature: (arr: pandas.core.series.Series, addition: int) -> pandas.core.series.Series.

Am i tackling this problem in a wrong way? I could reimplement the whole "addition" function in native PySpark, but the real function I am talking about is terribly complex and would mean an enormous amount of rework.


Solution

  • Loading the example, adding import array

    from pyspark.sql.types as T
    import pyspark.sql.functions as F
    import pandas as pd
    from array import array
    
    df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])
    df.show(truncate=False)
    print(df.schema.fields)
    

    The response is,

    +---------+--------+
    |    array|addition|
    +---------+--------+
    |[1, 2, 3]|      10|
    |[4, 5, 6]|      20|
    +---------+--------+
    
    [StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True)]
    

    If you must use a Pandas function to complete your task here is an option for a solution that uses a Pandas function within a PySpark UDF,

    • The Spark DF arr column is ArrayType, convert it into a Pandas Series
    • Apply the Pandas function
    • Then, convert the Pandas Series back to an array
    @F.udf(T.ArrayType(T.LongType()))
    def addition_pd(arr, addition):
        pd_arr = pd.Series(arr)
        added = pd_arr.add(addition)
        return array("l", added)
    
    df = df.withColumn("added", addition_pd(F.col("array"),F.col("addition")))
    df.show(truncate=False)
    print(df.schema.fields)
    

    Returns

    +---------+--------+------------+
    |array    |addition|added       |
    +---------+--------+------------+
    |[1, 2, 3]|10      |[11, 12, 13]|
    |[4, 5, 6]|20      |[24, 25, 26]|
    +---------+--------+------------+
    
    [StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True), StructField('added', ArrayType(LongType(), True), True)]
    

    However, it is worth stating that when possible it is recommended to use PySpark Functions over the use of PySpark UDF (see here)