Search code examples
apache-sparkpyspark

unionByName is only using a single core in apache spark


This has been driving me a little crazy so any help is greatly appreciated

I have a list of dataframes df_list with maybe around 500 small dataframes (I ingested csvs and wrote them as parquets, then I read each one as a spark df and appent to list)

and I'm trying to merge them like this

merged_df = reduce(lambda left, right: left.unionByName(right, allowMissingColumns=True), df_list)

the problem is this is only using a single core, and is bottlenecked, I don't understand why my other cores aren't being activated (I'm viewing htop)

I also tried this

df_list = [df.repartition(16) for df in df_list]  # Adjust the number based on your cluster size
merged_df = df_list[0]
for df in df_list[1:]:
    merged_df = merged_df.unionByName(df, allowMissingColumns=True)

no luck. How can I get more core activation? How can I get spark to distribute this merge accross multiple cores/executors? Does this question even make sense and if not, what am I fundamentally misunderstanding here?

I have a 128 gb memory, 32 core ec2 instance.

Below is the environment, basically I have a spark master and 1 worker (with 5 executors, each 4 core, 16 gb ram) running on a single ubuntu machine, I'm running this from a jupyter notebook and initializing my spark client as such

spark = (
    SparkSession
    .builder
    .master("spark://<removed by me>:9077")
    .appName("spark_experiment_mark_21")
    .config("spark.executor.memory", "16g")
    .config("spark.dynamicAllocation.initialExecutors", 5)
    .config("spark.executor.cores", 4)
    .config("spark.driver.memory", "30g")
    .config("spark.memory.offHeap.enabled", True)
    .config("spark.memory.offHeap.size","10g")
    .getOrCreate()
)

I believe most of the values are taking, I can see them in the spark configuration, also I set the following environment

SPARK_WORKER_CORES=20
SPARK_WORKER_MEMORY=80g

and the following spark properties (perhaps redundantly

spark.dynamicAllocation.initialExecutors        5
spark.executor.memory                           16g
spark.executor.cores                            4
spark.memory.offHeap.enabled                    true
spark.memory.offHeap.size                       20g

the spark.memory.offHeap.size I set in the spark session object is overriding the one in the properties file


Solution

  • The question does not make sense, and although I get your point, the reality is:

    1. reduce is serial, and
    2. UnionByName is also a a serial operation in terms of (py)Spark.

    You would need to roll your own code/logic, somehow.