Search code examples
pythonmultithreadingscikit-learnparallel-processingjoblib

Parallelize RandomizedSearchCV to restrict number CPUs used


I am trying to limit the number of CPUs' usage when I fit a model using sklearn RandomizedSearchCV, but somehow I keep using all CPUs. Following an answer from Python scikit learn n_jobs I have seen that in scikit-learn, we can use n_jobs to control the number of CPU-cores used.

n_jobs is an integer, specifying the maximum number of concurrently running workers. If 1 is given, no joblib parallelism is used at all, which is useful for debugging. If set to -1, all CPUs are used.
For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For example with n_jobs=-2, all CPUs but one are used.

But when setting n_jobs to -5 still all CPUs continue to run to 100%. I looked into joblib library to use Parallel and delayed. But still all my CPUs continue to be used. Here what I tried:

from sklearn.model_selection import RandomizedSearchCV
from joblib import Parallel,delayed

def rscv_l(model, param_grid, X_train, y_train):
    rs_model = RandomizedSearchCV(model, param_grid, n_iter=10,
                            n_jobs=-5, verbose=2, cv=5,
                            scoring='r2')
    rs_model.fit(X_train, y_train)         # the cpu usage problem comes here
    return rs_model

# Here my attempt to parallelize and set my function as iterable
results = Parallel( n_jobs = -5 )( delayed( rscv_l )( model,
                                                      param_grid,
                                                      X, y )
                                                  for X, y
                                             in zip( [X_train],
                                                     [y_train] ) ) 

What is going wrong?


UPDATE: Looking at How do you stop numpy from multithreading?, I think I might have a multithreading problem. When I inspect numpy configuration I find:

blas_mkl_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['user/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['user/include']
blas_opt_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['user/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['user/include']
lapack_mkl_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['user/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['user/include']
lapack_opt_info:
    libraries = ['mkl_rt', 'pthread']
    library_dirs = ['user/lib']
    define_macros = [('SCIPY_MKL_H', None), ('HAVE_CBLAS', None)]
    include_dirs = ['user/include']

but still the solutions proposed are not working for me:

import os
os.environ["OMP_NUM_THREADS"]        = "4" # export OMP_NUM_THREADS=4
os.environ["OPENBLAS_NUM_THREADS"]   = "4" # export OPENBLAS_NUM_THREADS=4 
os.environ["MKL_NUM_THREADS"]        = "6" # export MKL_NUM_THREADS=6
os.environ["VECLIB_MAXIMUM_THREADS"] = "4" # export VECLIB_MAXIMUM_THREADS=4
os.environ["NUMEXPR_NUM_THREADS"]    = "6" # export NUMEXPR_NUM_THREADS=6

import numpy
from sklearn.model_selection import RandomizedSearchCV

THIS SOLVED MY PROBLEM: Thanks to @user3666197 answer, I decided to limit the number of cpus for the whole script and simply use n_jobs with a positive integer. This solved my CPU usage problem:

import os
n_jobs = 2  # The number of tasks to run in parallel
n_cpus = 2  # Number of CPUs assigned to this process

pid = os.getpid()
print("PID: %i" % pid)

# Control which CPUs are made available for this script
cpu_arg = ''.join([str(ci) + ',' for ci in list(range(n_cpus))])[:-1]
cmd = 'taskset -cp %s %i' % (cpu_arg, pid)
print("executing command '%s' ..." % cmd)
os.system(cmd)

# hyperparameter tunning
rs_model = RandomizedSearchCV(xgb, param_grid, n_iter=10,
                            n_jobs=n_jobs, verbose=2, cv= n_folds,
                            scoring='r2')

#model fitting
rs_model.fit(X_train,y_train)

Solution

  • Q : " What is going wrong? "

    A :
    There is not a single thing that we can say that it "goes wrong", the code-execution eco-system is so multi-layered, that it is not as trivial as we might wish to enjoy & there are several (different, some hidden) places, where configurations decide, how many CPU-cores will actually bear the overall processing-load.

    Situation is also version-dependent & configuration-specific ( both Scikit, Numpy, Scipy have mutual dependencies & underlying dependencies on respective compilation options for numerical packages used )


    Experiment
    to prove -or- refute a just assumed syntax (d)effect :

    Given a documented feature of interpretation of negative numbers in top-level n_jobs parameter in RandomizedSearchCV(...) methods, submit the very same task, yet configured so that it has got explicit amount of permitted (top-level) n_jobs = CPU_cores_allowed_to_load and observe, when & how many cores do actually get loaded during the whole flow of processing.

    Results:
    if and only if that very number of "permitted" CPU-cores was loaded, the top-level call did correctly "propagate" the parameter settings to each & every method or procedure used alongside the flow of processing

    In case your observation proves the settings were not "obeyed", we can only review the whole scope of all source-code verticals to decide, who is to be blamed for such dis-obedience of not keeping the work compliant with the top-level set ceiling for the n_jobs. While O/S tools for CPU-core affinity mappings may give us some chances to "externally" restrict the number of such cores used, some other adverse effects ( the add-on management costs being the least performance-punishing ones ) will arise - thermal-management introduced CPU-core "hopping", being the disallowed by affinity maps, will on contemporary processors cause a more and more reduced clock-frequency (as cores get indeed hot in numerically intensive processing), thus prolonging the overall task processing times, as there are "cooler" (thus faster) CPU-cores in the system (those, that were prevented from being used by the affinity-mapping), yet these are very the same CPU-cores, that the affinity-mappings disallowed from being used for temporally placing our task processing (while the hot ones, from which the flow of the processing was reallocated due to reached thermal-ceilings, got some time to cold down and re-gain the chances to run at not decreased CPU-clock-rates)

    Top-level call might have set an n_jobs-parameter, yet any lower-level component might have "obeyed" that one value ( without knowing, how many other, concurrently working peers did the same - as in joblib.Parallel() and similar constructors do, not mentioning the other, inherently deployed, GIL-evading multithreading libraries - as that happen to lack any mutual coordination so as to keep the top-level set n_jobs-ceiling )

    def rscv_l( model, param_grid, X_train, y_train ):
        rs_model = RandomizedSearchCV( model,
                                       param_grid,
                                       n_iter  = 10,
                                       n_jobs  =  1, # DO NOT CANNIBALISE MORE
                                       verbose =  2, #        AS BEING RUN
                                       cv      =  5, #        IN CONFLICT
                                       scoring = 'r2'#        WITH OUTER-SETTINGS
                                       )             # ----vvv----------
        rs_model.fit( X_train, y_train )             # the cpu usage problem comes here
        return rs_model
    
    ################################################################
    #
    # Here my attempt to parallelize and set my function as iterable
    #
    results = Parallel( n_jobs = -5 # <------------- joblib spawns that many workers
                        )( delayed( rscv_l ) # <---# HERE, avoid
                                  ( model,         #       UNCOORDINATED
                                    param_grid,    #       CPU-CANNIBALISM
                                    X, y )         #       ref. above
                                for X, y in zip( [X_train],
                                                 [y_train] )
                           )
    

    If interested in more details

    you may also like this
    "How to find an optimum number of processes in GridSearchCV( ..., n_jobs = ... )?"
    this
    "How does scikit-learn handle..."
    and
    "How to find ideal number of parallel processes..."
    from
    other sources, covering this problem.