Search code examples
pysparkapache-spark-sql

Passing dataframe column as an argument to a function inpyspark


I'm new to pyspark and trying to explore some new methods of implementation. I'm trying to pass a derived column in a dataframe as an argument to a function which queries and returns a value

def getValue(col):
   cfg = spark.sql("select value from config_table where key='"+str(col)+"'")
   value = cfg.collect()[0][0]
   return value

main():
   df_output = df.withColumn('derived',getValue(col('col_to_fetch_the_value')))

The above code results in syntax error. Thanks in advance

Trying the below code works for me which cannot be used as the column value might change and cannot be hardcoded

def getValue(col):
   cfg = spark.sql("select value from config_table where key='"+str(col)+"'")
   value = cfg.collect()[0][0]
   return value

main():
   df_output = df.withColumn('derived',getValue('key'))

Solution

  • This seems like an opportunity to use joins.

    Example left join

    If your input df defines the look-up key, you could use a left join to apply the derived value to the rows where that look-up key is defined

    config_df = spark.sql("SELECT key, value from config_table")
    
    df_output = df.join(config_df, df.col_to_fetch_the_value=config_df.key "left").withColumnRenamed("value", "derived")
    

    getValue() Function

    From what I can see so far, there is not a clear case for using the getValue function over using a join. If you have a case where you must use the getValue function, use the PySpark UDF function to pass the getValue function as your column definition: https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html