Search code examples
apache-sparkpysparkhadoop-yarngoogle-cloud-dataproc

Dataproc CPU usage too low even though all the cores got used


Issue: I run a spark job that uses up all the cores on all the nodes and yet in the Dataproc CPU monitoring graph the CPU usage touches a max of 12%

I have a dummy cluster with 2 nodes. Each node has:

  1. 16 GiB memory
  2. 16 cores

I start the spark session with the following configuration and run the following simple code:

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.executor.cores","5").\
config("spark.executor.instances","6").\
config("spark.executor.memory","4100m").\
config("spark.dynamicAllocation.enabled", False).\
getOrCreate()
df_sales = spark.read.parquet("gs://xxxxxxx-xxxxxxxx.appspot.com/spark_datasets/sales_parquet_dtc")
df_sales.count()

and, the yarn UI does show that I am able to allocate almost all of the clusters via YARN.

I can see in the progress bar that 30 tasks (6 executors* 5 cores per executor) are running in parallel. Hence all of the available cluster minus 2 cores that were intentionally left out for the OS was put to use

enter image description here

Yet, in the Monitoring, the CPU utilization touches a maximum of only 12.5%

enter image description here

Here is the DAG that got generated:

enter image description here

Why is CPU utilization such a small number? Am I wrong in assuming that the CPU utilization should have been higher than this?

I have also checked with df_sales.withColumn("part_id", spark_partition_id()).groupby("part_id").count().collect() the size of all my partitions are almost identical

Following are the stagewise DAGs: Stage 0: enter image description here

Stage 1: enter image description here

Stage2: Shows as skipped

Stage3: enter image description here

Furthermore, When I look at the event timeline for the stage 1 (which is the one with 78 tasks) I can see that 30 cores are actually simultaneously busy most of the time

enter image description here

Am I wrong in thinking that any time when a vertical line intersects 30 green bars, 30 cores are simultaneously busy and at that moment the CPU utilization should be 30/32 at a minimum?

I have been able to get the CPU usage up to 63% according to the yarn metrics chart I am no longer sure how much to trust for the reasons given below:

Thinking that maybe my workload is very trivial compared to the network time I cached the data and then ran a few cpu intensive transformations on the data. I also read in pretty heavy partitions

#spark config
spark = SparkSession.builder.\
config("spark.executor.cores","15").\
config("spark.executor.instances","2").\
config("spark.executor.memory","12100m").\
config("spark.dynamicAllocation.enabled", False).\
config("spark.sql.adaptive.enabled", False).\
config("spark.sql.files.maxPartitionBytes","9g").\
getOrCreate()


#cache the data (the data is 55 GiBs in csv and ~10 GiB in memory deserialized)
df_covid = spark.read.csv("gs://monsoon-credittech.appspot.com/spark_datasets/covid40g.csv",
                          header=True, inferSchema=False)
df_covid.cache()
df_covid.count()


#Run heavy transformations
df_covid = df_covid.withColumn("catted",F.concat_ws('',*df_covid.columns))
df_covid = df_covid.withColumn("size0", F.length(F.split("catted", "a").getItem(0)))
df_covid = df_covid.withColumn("size1", F.length(F.split("catted", "a").getItem(1)))
df_covid = df_covid.withColumn("size2", F.length(F.split("catted", "a").getItem(2)))
df_covid.filter((df_covid.size0 > 5) & (df_covid.size1 > 5) & (df_covid.size2 > 5)).count()

I monitored the CPU usage via

  1. Dataproc yarn CPU usage chart (touches 63% max)
  2. htop from within workers (all cores are usually busy completely)

dataproc yarn

htop in two nodes As you can see all the cores are busy and yet the YARN monitoring shows that max of 63% of my CPU got used

At this point I do not know how to make sense of the YARN monitoring live chart

Hoping someone helps me understand what is going on here


Solution

  • The screenshot of your progress bar does indeed show that there were 30 tasks running at the same time. This makes sense, given the amount of CPUs in your cluster (5 * 6 = 30).

    In general, you have 1 cpu core per task (unless you set the spark.task.cpus config parameter to something different than the default 1, which you did not do). So you would indeed guess that if you have 30 tasks that are running, you would have a CPU utilization of +-94% (30/32).

    What's going on here then, how come your utilization is so low? Well, it is not because a task reserves 1 cpu that it really utilizes 1 cpu. This means that there is something else that is stalling your tasks, and making your CPU wait. There is another bottleneck in the story.

    What could those bottlenecks be? Let's take a closer look at Stage 1 (which is the stage we're talking about). The rough structure of this task is the following:

    • Scan parquet (this is the reading of your files)
      • Possible bottlenecks:
        • If your underlying storage technology is slower than what your cpus can process (imagine using floppy disks to exaggerate)
        • If the network connection to your storage layer is overloaded
        • If the communication delay between your cpu and storage (your ping) is very large, your CPUs will have to wait too
    • WholeStageCodeGen (your count happens here, which can be optimized by Spark)
      • No immediately obvious bottleneck here
    • Exchange (this is a shuffle, because those 78 tasks calulcated a partial count, now a total count needs to be done so a shuffle is needed)
      • No immediately obvious bottleneck here, since the data being shuffled is so small

    I would guess that something might be up with the reading of your file.

    Try the following:

    • Run this exact same code but with only 1 executor, with 1 CPU and check the CPU utilization.

      • If your CPU utilization goes closer to 100% efficiency, this might mean that either your network to your storage layer can't handle the traffic or your storage layer can't handle the necessary IO (does not seem very likely with these data volumes)
      • If it does not change, I would guess that the delay to your storage layer might be the culprit.

    But to finish off: You're doing a .count operation on a parquet file that is +-10GB large. A .count operation reads a VERY small amount of data, and processes a VERY small amount of data. So your CPUs have to do VERY little work. This means that any work that your CPUs don't do but still use time (like sending read queries to your storage, ...) will actually use a proportionally large amount of time. I would guess that if you do more heavy work (for example df.withColumn("substring", substring(col("MyStringCol"))).show) your CPU utilization will go closer to your expected 94%.