Search code examples
apache-sparkdataframepysparkuser-defined-functionsbroadcast

How to reference a dataframe when in an UDF on another dataframe?


How do you reference a pyspark dataframe when in the execution of an UDF on another dataframe?

Here's a dummy example. I am creating two dataframes scores and lastnames, and within each lies a column that is the same across the two dataframes. In the UDF applied on scores, I want to filter on lastnames and return a string found in lastname.

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local")
sqlCtx = SQLContext(sc)


# Generate Random Data
import itertools
import random
student_ids = ['student1', 'student2', 'student3']
subjects = ['Math', 'Biology', 'Chemistry', 'Physics']
random.seed(1)
data = []

for (student_id, subject) in itertools.product(student_ids, subjects):
    data.append((student_id, subject, random.randint(0, 100)))

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student_id", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])

# Create DataFrame 
rdd = sc.parallelize(data)
scores = sqlCtx.createDataFrame(rdd, schema)

# create another dataframe
last_name = ["Granger", "Weasley", "Potter"]
data2 = []
for i in range(len(student_ids)):
    data2.append((student_ids[i], last_name[i]))

schema = StructType([
            StructField("student_id", StringType(), nullable=False),
            StructField("last_name", StringType(), nullable=False)
    ])

rdd = sc.parallelize(data2)
lastnames = sqlCtx.createDataFrame(rdd, schema)


scores.show()
lastnames.show()


from pyspark.sql.functions import udf
def getLastName(sid):
    tmp_df = lastnames.filter(lastnames.student_id == sid)
    return tmp_df.last_name

getLastName_udf = udf(getLastName, StringType())
scores.withColumn("last_name", getLastName_udf("student_id")).show(10)

And the following is the last part of the trace:

Py4JError: An error occurred while calling o114.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

Solution

  • Changing pair to dictionary for easy lookup of names

    data2 = {}
    for i in range(len(student_ids)):
        data2[student_ids[i]] = last_name[i]
    

    Instead of creating rdd and making it to df create broadcast variable

    //rdd = sc.parallelize(data2) 
    //lastnames = sqlCtx.createDataFrame(rdd, schema)
    lastnames = sc.broadcast(data2)  
    

    Now access this in udf with values attr on broadcast variable(lastnames).

    from pyspark.sql.functions import udf
    def getLastName(sid):
        return lastnames.value[sid]