Search code examples
apache-sparkpyspark

Retrieving the default configurations


As per the post , the output of spark.sparkContext.getConf().get() does not always return a value for a configuration parameter. For some configuration parameters, it only returns a value if we have explicitly set that value.

I'm using Databricks/Dataproc, not setting any params explicitly. When we dont set any params explicitly, print(spark.sparkContext.getConf().get("spark.executor.cores")) returns None. Also don't see a mention of spark.executor.cores/spark.executor.instances in spark.sparkContext.getConf().getAll() . In such cases, how can we check default value for a config being used by Spark?


Solution

  • You can do something like this SO answer proposes:

    import json
    import urllib.request
    
    from pyspark.sql.session import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    sc = spark.sparkContext
    u = sc.uiWebUrl + "/api/v1/applications/" + sc.applicationId + "/allexecutors"
    with urllib.request.urlopen(u) as url:
        executors_data = json.loads(url.read().decode())
    

    Now, this executors_data is a python list (one element per executor) that contains a bunch of information about your executors. It looks like this:

    [
      {
        "id": "driver",
        "hostPort": "XXX",
        "isActive": "True",
        "rddBlocks": 0,
        "memoryUsed": 0,
        "diskUsed": 0,
        "totalCores": 16,
        "maxTasks": 16,
        "activeTasks": 0,
        "failedTasks": 0,
        "completedTasks": 0,
        "totalTasks": 0,
        "totalDuration": 26786,
        "totalGCTime": 21,
        "totalInputBytes": 0,
        "totalShuffleRead": 0,
        "totalShuffleWrite": 0,
        "isBlacklisted": "False",
        "maxMemory": 455501414,
        "addTime": "2024-01-25T10:34:44.08 9GMT",
        "executorLogs": {},
        "memoryMetrics": {
          "usedOnHeapStorageMemory": 0,
          "usedOffHeapStorageMemory": 0,
          "totalOnHeapStorageMemory": 455501414,
          "totalOffHeapStorageMemory": 0
        },
        "blacklistedInStages": [],
        "peakM emoryMetrics": {
          "JVMHeapMemory": 148771744,
          "JVMOffHeapMemory": 68732808,
          "OnHeapExecutionMemory": 0,
          "OffHeapExecutionMemory": 0,
          "OnHeapStorageMemory": 0,
          "OffHeapStorageMemory": 0,
          "OnHeapUnifiedMemory": 0,
          " OffHeapUnifiedMemory": 0,
          "DirectPoolMemory": 16385,
          "MappedPoolMemory": 0,
          "ProcessTreeJVMVMemory": 0,
          "ProcessTreeJVMRSSMemory": 0,
          "ProcessTreePythonVMemory": 0,
          "ProcessTreePythonRSSMemory": 0,
          "ProcessTreeO therVMemory": 0,
          "ProcessTreeOtherRSSMemory": 0,
          "MinorGCCount": 5,
          "MinorGCTime": 21,
          "MajorGCCount": 0,
          "MajorGCTime": 0,
          "TotalGCTime": 21
        },
        "attributes": {},
        "resources": {},
        "resourceProfileId": 0,
        "isExclu ded": "False",
        "excludedInStages": []
      }
    ]
    

    You'll notice that in my case I only have 1 executor (which is the driver). This is because I'm executing this on a pyspark shell with --master local so that turns your driver into the only executor. In case you have an actual cluster where the driver and executors are separate JVM processes, you can filter the executors_data list like so:

    executors_data = [x for x in executors_data if x["id"] != "driver"]
    

    To extract info out of this list, you can do things like the following:

    num_executors = len(executors_data)
    total_cores = sum([x["totalCores"] for x in executors_data])