Search code examples
pythongoogle-cloud-platformpysparkuser-defined-functionsgoogle-cloud-dataproc

Transforming Python Lambda function without return value to Pyspark


I have a working lambda function in Python that computes the highest similarity between each string in dataset1 and the strings in dataset2. During an iteration, it writes the string, the best match and the similarity together with some other information to bigquery. There is no return value, as the purpose of the function is to insert a row into a bigquery dataset. This process takes rather long which is why I wanted to use Pyspark and Dataproc to speed up the process.

Converting the pandas dataframes to spark was easy. I am having trouble to register my udf, because it has no return value and pyspark requires one. In addition I don't understand how to map the 'apply' function in python to the pyspark variant. So basically my question is how to transform the python code below to work on a spark dataframe.

The following code works in a regular Python environment:

def embargomatch(name, code, embargo_names):
     find best match 
     insert best match and additional information to bigquery

customer_names.apply(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names),axis=1)

Because pyspark requires a return type, I added 'return 1' to the udf and tried the following:


customer_names = spark.createDataFrame(customer_names)

from pyspark.sql.types import IntegerType
embargo_match_udf = udf(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names), IntegerType())

Now i'm stuck trying to apply the select function, as I don't know what parameters to give.


Solution

  • I suspect you're stuck on how to pass multiple columns to the udf -- here's a good answer to that question: Pyspark: Pass multiple columns in UDF.

    Rather than creating a udf based on a lambda that wraps your function, consider simplifying by creating a udf based on embargomatch directly.

    embargo_names = ...
    
    # The parameters here are the columns passed into the udf
    def embargomatch(name, customer_code):
        pass
    embargo_match_udf = udf(embargomatch, IntegerType())
    customer_names.select(embargo_match_udf(array('name', 'customer_code')).alias('column_name'))
    

    That being said, it's suspect that your udf doesn't return anything -- I generally see udfs as a way to add columns to the dataframe, but not to have side effects. If you want to insert records into bigquery, consider doing something like this:

    customer_names.select('column_name').write.parquet('gs://some/path')
    os.system("bq load --source_format=PARQUET [DATASET].[TABLE] gs://some/path")