Search code examples

pandas_udf with pd.Series and other object as arguments

I am having trouble with creating a Pandas UDF that performs a calculation on a pd Series based on a value in the same row of the underlying Spark Dataframe.

However, the most straight forward solution doesn't seem to be supported by the Pandas on Spark API:

A very simple example like below

from pyspark.sql.types import IntegerType

import pyspark.sql.functions as F
import pandas as pd

def addition(arr: pd.Series, addition: int) -> pd.Series:
  return arr.add(addition)

df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])

df.withColumn("added", addition(F.col("array"),F.col("addition")))

throws the following exception on the udf definition line

NotImplementedError: Unsupported signature: (arr: pandas.core.series.Series, addition: int) -> pandas.core.series.Series.

Am i tackling this problem in a wrong way? I could reimplement the whole "addition" function in native PySpark, but the real function I am talking about is terribly complex and would mean an enormous amount of rework.


  • Loading the example, adding import array

    from pyspark.sql.types as T
    import pyspark.sql.functions as F
    import pandas as pd
    from array import array
    df = spark.createDataFrame([([1,2,3],10),([4,5,6],20)],["array","addition"])

    The response is,

    |    array|addition|
    |[1, 2, 3]|      10|
    |[4, 5, 6]|      20|
    [StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True)]

    If you must use a Pandas function to complete your task here is an option for a solution that uses a Pandas function within a PySpark UDF,

    • The Spark DF arr column is ArrayType, convert it into a Pandas Series
    • Apply the Pandas function
    • Then, convert the Pandas Series back to an array
    def addition_pd(arr, addition):
        pd_arr = pd.Series(arr)
        added = pd_arr.add(addition)
        return array("l", added)
    df = df.withColumn("added", addition_pd(F.col("array"),F.col("addition")))


    |array    |addition|added       |
    |[1, 2, 3]|10      |[11, 12, 13]|
    |[4, 5, 6]|20      |[24, 25, 26]|
    [StructField('array', ArrayType(LongType(), True), True), StructField('addition', LongType(), True), StructField('added', ArrayType(LongType(), True), True)]

    However, it is worth stating that when possible it is recommended to use PySpark Functions over the use of PySpark UDF (see here)