Search code examples
pysparkmicrosoft-fabric

PySpark apply complex function to every row of dataframe to construct new column


I am using a Spark Notebook in Microsoft Fabric. I want to construct a column mapping from metadata in a lakehouse. The mapping shall be written into a "mapping" column in a dataframe with a list of tables.

My current attempt looks like this:

# Create initial list of table data
dataframe_tablelist = spark.createDataFrame(
    [
        ("abcd", "AB", "t1"),
        ("efgh", "CD", "t2"),
        ("efgh", "CD", "t3"),
    ],
    ["database", "entity", "table_name"]
)

def construct_mapping(database, entity, table_name):
    meta_name = "Metadata_" + database + "_" + entity + "_" + table_name
    metadata = spark.sql(f"""select * from {meta_name}""")
    # Here I would construct the mapping from the metadata
    return meta_name

udf_constructor = udf(construct_mapping, StringType())

mapping_df = dataframe_tablelist.withColumn("test_column", udf_constructor(dataframe_tablelist.database, dataframe_tablelist.entity, dataframe_tablelist.table_name))

display(mapping_df)

I get this error that I don't understand at all:

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

I could probably get it to work with collect() and appending row by row, but I want to do it the "right" way.


Solution

  • You are using the spark.sql function in a udf. That means that the function is passed to your worker nodes, while your spark session does not exist on your worker nodes.

    A solution I would suggest is keeping it as a normal Python function instead of a udf. This means that your idea to work with collect() is a good solution in my opinion. From there on, you can call the function on your list and as it's being run by your driver node, you are able to reference your spark session.

    Also note that udf's are usually less efficient than normal Python functions, so doing these types of operations in a udf would also be less performant in general.