Search code examples
apache-sparkpysparkhyperopt

Setting task slots with pyspark on an individual machine


I am trying to run the optimization of a ML model using SparkTrials from the hyperopt library. I am running this on a single machine with 16 cores but when I run the following code which sets the number of cores to 8 I get a warning that seems to indicate that only one core is used.

SparkTrials accepts as an argument spark_session which in theory is where I set the number of cores.

Can anyone help me?

Thanks!

import os, shutil, tempfile
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import numpy as np
from sklearn import linear_model, datasets, model_selection
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").config('spark.local.dir', './').config("spark.executor.cores", 8).getOrCreate()

def gen_data(bytes):
  """
  Generates train/test data with target total bytes for a random regression problem.
  Returns (X_train, X_test, y_train, y_test).
  """
  n_features = 100
  n_samples = int(1.0 * bytes / (n_features + 1) / 8)
  X, y = datasets.make_regression(n_samples=n_samples, n_features=n_features, random_state=0)
  return model_selection.train_test_split(X, y, test_size=0.2, random_state=1)

def train_and_eval(data, alpha):
  """
  Trains a LASSO model using training data with the input alpha and evaluates it using test data.
  """
  X_train, X_test, y_train, y_test = data  
  model = linear_model.Lasso(alpha=alpha)
  model.fit(X_train, y_train)
  loss = model.score(X_test, y_test)
  return {"loss": loss, "status": STATUS_OK}

def tune_alpha(objective):
  """
  Uses Hyperopt's SparkTrials to tune the input objective, which takes alpha as input and returns loss.
  Returns the best alpha found.
  """
  best = fmin(
    fn=objective,
    space=hp.uniform("alpha", 0.0, 10.0),
    algo=tpe.suggest,
    max_evals=8,
    trials=SparkTrials(parallelism=8,spark_session=spark))
  return best["alpha"]

data_small = gen_data(10 * 1024 * 1024)  # ~10MB

def objective_small(alpha):
  # For small data, you might reference it directly.
  return train_and_eval(data_small, alpha)

tune_alpha(objective_small)

Parallelism (8) is greater than the current total of Spark task slots (1). If dynamic allocation is enabled, you might see more executors allocated.


Solution

  • if you are in cluster: The core in Spark nomenclature is unrelated to the physical core in your CPU here with spark.executor.cores you specified the maximum number of thread(=task) each executor(you have one here) can run is 8 if you want to increase the number of executors you have to use --num-executors in command-line or spark.executor.instances configuration property in your code.

    I suggest try something like this configuration if you are in a yarn cluster

    spark.conf.set("spark.dynamicAllocation.enabled", "true")
    spark.conf.set("spark.executor.cores", 4)
    spark.conf.set("spark.dynamicAllocation.minExecutors","2")
    spark.conf.set("spark.dynamicAllocation.maxExecutors","10")
    

    please consider above options are not available in local mode

    local: in local mode you only have one executor and if you want to change the number of its worker threads (which is one by default) you have to set your master like this local[*] or local[16]