Search code examples
pythonapache-sparkpysparkpyarrow

Problem running a Pandas UDF on a large dataset


I'm currently working on a project and I am having a hard time understanding how does the Pandas UDF in PySpark works.

I have a Spark Cluster with one Master node with 8 cores and 64GB, along with two workers of 16 cores each and 112GB. My dataset is quite large and divided into seven principal partitions consisting each of ~78M lines. The dataset consists of 70 columns. I defined a Pandas UDF in to do some operations on the dataset, that can only be done using Python, on a Pandas dataframe.

The pandas UDF is defined this way :

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)

There is absolutely no way to get the Pandas UDF to work as it crashes before even doing the operations. I suspect there is an OOM error somewhere. The code above runs for a few minutes before crashing with an error code stating that the connection has reset. However, if I call the .toPandas() function after filtering on one partition and then display it, it runs fine, with no error. The error seems to happen only when using a PandasUDF.

I fail to understand how it works. Does Spark try to convert one whole partition at once (78M lines) ? If so, what memory does it use ? The driver memory ? The executor's ? If it's on the driver's, is all Python code executed on it ?

The cluster is configured with the following :

  • SPARK_WORKER_CORES=2
  • SPARK_WORKER_MEMORY=64g
  • spark.executor.cores 2
  • spark.executor.memory 30g (to allow memory for the python instance)
  • spark.driver.memory 43g

Am I missing something or is there just no way to run 78M lines through a PandasUDF ?


Solution

  • Does Spark try to convert one whole partition at once (78M lines) ?

    That's exactly what happens. Spark 3.0 adds support for chunked UDFs, which operate on iterators of Pandas DataFrames or Series, but if operations on the dataset, that can only be done using Python, on a Pandas dataframe, these might not be the right choice for you.

    If so, what memory does it use ? The driver memory? The executor's?

    Each partition is processed locally, on the respective executor, and data is passed to and from Python worker, using Arrow streaming.

    Am I missing something or is there just no way to run 78M lines through a PandasUDF?

    As long as you have enough memory to handle Arrow input, output (especially if data is copied), auxiliary data structures, as well as as JVM overhead, it should handle large datasets just fine.

    But on such tiny cluster, you'll be better with partitioning the output and reading data directly with Pandas, without using Spark at all. This way you'll be able to use all the available resources (i.e. > 100GB / interpreter) for data processing instead of wasting these on secondary tasks (having 16GB - overhead / interpreter).