As per the post , the output of spark.sparkContext.getConf().get()
does not always return a value for a configuration parameter. For some configuration parameters, it only returns a value if we have explicitly set that value.
I'm using Databricks/Dataproc, not setting any params explicitly. When we dont set any params explicitly, print(spark.sparkContext.getConf().get("spark.executor.cores"))
returns None. Also don't see a mention of spark.executor.cores
/spark.executor.instances
in spark.sparkContext.getConf().getAll()
. In such cases, how can we check default value for a config being used by Spark?
You can do something like this SO answer proposes:
import json
import urllib.request
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
u = sc.uiWebUrl + "/api/v1/applications/" + sc.applicationId + "/allexecutors"
with urllib.request.urlopen(u) as url:
executors_data = json.loads(url.read().decode())
Now, this executors_data
is a python list (one element per executor) that contains a bunch of information about your executors. It looks like this:
[
{
"id": "driver",
"hostPort": "XXX",
"isActive": "True",
"rddBlocks": 0,
"memoryUsed": 0,
"diskUsed": 0,
"totalCores": 16,
"maxTasks": 16,
"activeTasks": 0,
"failedTasks": 0,
"completedTasks": 0,
"totalTasks": 0,
"totalDuration": 26786,
"totalGCTime": 21,
"totalInputBytes": 0,
"totalShuffleRead": 0,
"totalShuffleWrite": 0,
"isBlacklisted": "False",
"maxMemory": 455501414,
"addTime": "2024-01-25T10:34:44.08 9GMT",
"executorLogs": {},
"memoryMetrics": {
"usedOnHeapStorageMemory": 0,
"usedOffHeapStorageMemory": 0,
"totalOnHeapStorageMemory": 455501414,
"totalOffHeapStorageMemory": 0
},
"blacklistedInStages": [],
"peakM emoryMetrics": {
"JVMHeapMemory": 148771744,
"JVMOffHeapMemory": 68732808,
"OnHeapExecutionMemory": 0,
"OffHeapExecutionMemory": 0,
"OnHeapStorageMemory": 0,
"OffHeapStorageMemory": 0,
"OnHeapUnifiedMemory": 0,
" OffHeapUnifiedMemory": 0,
"DirectPoolMemory": 16385,
"MappedPoolMemory": 0,
"ProcessTreeJVMVMemory": 0,
"ProcessTreeJVMRSSMemory": 0,
"ProcessTreePythonVMemory": 0,
"ProcessTreePythonRSSMemory": 0,
"ProcessTreeO therVMemory": 0,
"ProcessTreeOtherRSSMemory": 0,
"MinorGCCount": 5,
"MinorGCTime": 21,
"MajorGCCount": 0,
"MajorGCTime": 0,
"TotalGCTime": 21
},
"attributes": {},
"resources": {},
"resourceProfileId": 0,
"isExclu ded": "False",
"excludedInStages": []
}
]
You'll notice that in my case I only have 1 executor (which is the driver). This is because I'm executing this on a pyspark shell with --master local
so that turns your driver into the only executor. In case you have an actual cluster where the driver and executors are separate JVM processes, you can filter the executors_data list like so:
executors_data = [x for x in executors_data if x["id"] != "driver"]
To extract info out of this list, you can do things like the following:
num_executors = len(executors_data)
total_cores = sum([x["totalCores"] for x in executors_data])