Search code examples
pythonmachine-learningscikit-learnxgboost

sklearn StackingClassifier and sample weights


I have a stacking workflow which is similar to

import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import StackingClassifier
from sklearn.pipeline import make_pipeline
import xgboost as xgb

X = np.random.random(size=(1000, 5))
y = np.random.choice([0,1], 1000)
w = np.random.random(size=(1000,))

scaler = StandardScaler()
log_reg = LogisticRegression()

params = {
    'n_estimators': 10,
    'max_depth': 3,
    'learning_rate': 0.1
}

log_reg_pipe = make_pipeline(
    scaler,
    log_reg
)

stack_pipe = make_pipeline(
    StackingClassifier(
        estimators=[('lr', lr_stack_pipe)],
        final_estimator=xgb.XGBClassifier(**params),
        passthrough=True,
        cv=2
    )
)

I'd like to be able to pass sample weights into the xgboost. My question is how do I set sample weights in the final estimator?

I have tried

stack_pipe.fit(X, y, sample_weights=w) which throws

ValueError: Pipeline.fit does not accept the sample_weights parameter. You can pass parameters to specific steps of your pipeline using the stepname__parameter format, e.g. `Pipeline.fit(X, y, logisticregression__sample_weight=sample_weight)`

Solution

  • I also recently realized that stacking estimators can't handle sample-weighted Pipelines. I addressed the issue by subclassing the StackingRegressor and StackingClassifier classes from scikit-learn and overwriting its fit() method to better manage Pipelines. Take a look at the following:

    """Implement StackingClassifier that can handle sample-weighted Pipelines."""
    
    from sklearn.ensemble import StackingRegressor, StackingClassifier
    from copy import deepcopy
    
    import numpy as np
    from joblib import Parallel
    
    from sklearn.base import clone
    from sklearn.base import is_classifier, is_regressor
    
    from sklearn.model_selection import cross_val_predict
    from sklearn.model_selection import check_cv
    
    from sklearn.utils import Bunch
    from sklearn.utils.fixes import delayed
    
    from sklearn.pipeline import Pipeline
    
    ESTIMATOR_NAME_IN_PIPELINE = 'estimator'
    
    def new_fit_single_estimator(estimator, X, y, sample_weight=None,
                                 message_clsname=None, message=None):
        """Private function used to fit an estimator within a job."""
        if sample_weight is not None:
            try:
                if isinstance(estimator, Pipeline):
                    # determine name of final estimator
                    estimator_name = estimator.steps[-1][0]
                    kwargs = {estimator_name + '__sample_weight': sample_weight}
                    estimator.fit(X, y, **kwargs)
                else:
                    estimator.fit(X, y, sample_weight=sample_weight)
            except TypeError as exc:
                if "unexpected keyword argument 'sample_weight'" in str(exc):
                    raise TypeError(
                        "Underlying estimator {} does not support sample weights."
                        .format(estimator.__class__.__name__)
                    ) from exc
                raise
        else:
            estimator.fit(X, y)
        return estimator
    
    
    class FlexibleStackingClassifier(StackingClassifier):
    
        def __init__(self, estimators, final_estimator=None, *, cv=None,
                     n_jobs=None, passthrough=False, verbose=0):
            super().__init__(
                estimators=estimators,
                final_estimator=final_estimator,
                cv=cv,
                n_jobs=n_jobs,
                passthrough=passthrough,
                verbose=verbose
            )
    
        def fit(self, X, y, sample_weight=None):
            """Fit the estimators.
    
            Parameters
            ----------
            X : {array-like, sparse matrix} of shape (n_samples, n_features)
                Training vectors, where `n_samples` is the number of samples and
                `n_features` is the number of features.
            y : array-like of shape (n_samples,)
                Target values.
            sample_weight : array-like of shape (n_samples,) or default=None
                Sample weights. If None, then samples are equally weighted.
                Note that this is supported only if all underlying estimators
                support sample weights.
                .. versionchanged:: 0.23
                   when not None, `sample_weight` is passed to all underlying
                   estimators
    
            Returns
            -------
            self : object
            """
            # all_estimators contains all estimators, the one to be fitted and the
            # 'drop' string.
            names, all_estimators = self._validate_estimators()
            self._validate_final_estimator()
    
            stack_method = [self.stack_method] * len(all_estimators)
    
            # Fit the base estimators on the whole training data. Those
            # base estimators will be used in transform, predict, and
            # predict_proba. They are exposed publicly.
            self.estimators_ = Parallel(n_jobs=self.n_jobs)(
                delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight)
                for est in all_estimators if est != 'drop'
            )
    
            self.named_estimators_ = Bunch()
            est_fitted_idx = 0
            for name_est, org_est in zip(names, all_estimators):
                if org_est != 'drop':
                    self.named_estimators_[name_est] = self.estimators_[
                        est_fitted_idx]
                    est_fitted_idx += 1
                else:
                    self.named_estimators_[name_est] = 'drop'
    
            # To train the meta-classifier using the most data as possible, we use
            # a cross-validation to obtain the output of the stacked estimators.
    
            # To ensure that the data provided to each estimator are the same, we
            # need to set the random state of the cv if there is one and we need to
            # take a copy.
            cv = check_cv(self.cv, y=y, classifier=is_classifier(self))
            if hasattr(cv, 'random_state') and cv.random_state is None:
                cv.random_state = np.random.RandomState()
    
            self.stack_method_ = [
                self._method_name(name, est, meth)
                for name, est, meth in zip(names, all_estimators, stack_method)
            ]
            fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight}
                          if sample_weight is not None
                          else None)
            predictions = Parallel(n_jobs=self.n_jobs)(
                delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv),
                                           method=meth, n_jobs=self.n_jobs,
                                           fit_params=fit_params,
                                           verbose=self.verbose)
                for est, meth in zip(all_estimators, self.stack_method_)
                if est != 'drop'
            )
    
            # Only not None or not 'drop' estimators will be used in transform.
            # Remove the None from the method as well.
            self.stack_method_ = [
                meth for (meth, est) in zip(self.stack_method_, all_estimators)
                if est != 'drop'
            ]
    
            X_meta = self._concatenate_predictions(X, predictions)
            new_fit_single_estimator(self.final_estimator_, X_meta, y,
                                     sample_weight=sample_weight)
    
            return self
    
    
    class FlexibleStackingRegressor(StackingRegressor):
    
        def __init__(self, estimators, final_estimator=None, *, cv=None,
                     n_jobs=None, passthrough=False, verbose=0):
            super().__init__(
                estimators=estimators,
                final_estimator=final_estimator,
                cv=cv,
                n_jobs=n_jobs,
                passthrough=passthrough,
                verbose=verbose
            )
    
        def fit(self, X, y, sample_weight=None):
            """Fit the estimators.
    
            Parameters
            ----------
            X : {array-like, sparse matrix} of shape (n_samples, n_features)
                Training vectors, where `n_samples` is the number of samples and
                `n_features` is the number of features.
            y : array-like of shape (n_samples,)
                Target values.
            sample_weight : array-like of shape (n_samples,) or default=None
                Sample weights. If None, then samples are equally weighted.
                Note that this is supported only if all underlying estimators
                support sample weights.
                .. versionchanged:: 0.23
                   when not None, `sample_weight` is passed to all underlying
                   estimators
    
            Returns
            -------
            self : object
            """
            # all_estimators contains all estimators, the one to be fitted and the
            # 'drop' string.
            names, all_estimators = self._validate_estimators()
            self._validate_final_estimator()
    
            stack_method = [self.stack_method] * len(all_estimators)
    
            # Fit the base estimators on the whole training data. Those
            # base estimators will be used in transform, predict, and
            # predict_proba. They are exposed publicly.
            self.estimators_ = Parallel(n_jobs=self.n_jobs)(
                delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight)
                for est in all_estimators if est != 'drop'
            )
    
            self.named_estimators_ = Bunch()
            est_fitted_idx = 0
            for name_est, org_est in zip(names, all_estimators):
                if org_est != 'drop':
                    self.named_estimators_[name_est] = self.estimators_[
                        est_fitted_idx]
                    est_fitted_idx += 1
                else:
                    self.named_estimators_[name_est] = 'drop'
    
            # To train the meta-classifier using the most data as possible, we use
            # a cross-validation to obtain the output of the stacked estimators.
    
            # To ensure that the data provided to each estimator are the same, we
            # need to set the random state of the cv if there is one and we need to
            # take a copy.
            cv = check_cv(self.cv, y=y, classifier=is_classifier(self))
            if hasattr(cv, 'random_state') and cv.random_state is None:
                cv.random_state = np.random.RandomState()
    
            self.stack_method_ = [
                self._method_name(name, est, meth)
                for name, est, meth in zip(names, all_estimators, stack_method)
            ]
            fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight}
                          if sample_weight is not None
                          else None)
            predictions = Parallel(n_jobs=self.n_jobs)(
                delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv),
                                           method=meth, n_jobs=self.n_jobs,
                                           fit_params=fit_params,
                                           verbose=self.verbose)
                for est, meth in zip(all_estimators, self.stack_method_)
                if est != 'drop'
            )
    
            # Only not None or not 'drop' estimators will be used in transform.
            # Remove the None from the method as well.
            self.stack_method_ = [
                meth for (meth, est) in zip(self.stack_method_, all_estimators)
                if est != 'drop'
            ]
    
            X_meta = self._concatenate_predictions(X, predictions)
            new_fit_single_estimator(self.final_estimator_, X_meta, y,
                                     sample_weight=sample_weight)
    
            return self
    
    

    I included both the Regressor and Classifier versions, though you only seem to need to be able to use the Classifier subclass.

    But a word of warning: you must give your estimators the same name in your pipelines, and that name must align with the ESTIMATOR_NAME_IN_PIPELINE field defined below. Otherwise the code won't work. For example, here would be an appropriately defined Pipeline instance using the same name as defined in the class definition script shown above:

    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import TweedieRegressor
    from sklearn.feature_selection import VarianceThreshold
    
    validly_named_pipeline = Pipeline([
        ('variance_threshold', VarianceThreshold()),
        ('scaler', StandardScaler()),
        ('estimator', TweedieRegressor())
    ])
    

    This isn't ideal, but it's what I have for now and should work regardless.

    Edit: Just to be clear, when I was overwriting the fit() method I just copied and pasted the code from the scikit repository and made the necessary changes, which constituted only a few lines. So much of the pasted code is not my original work, but that of the scikit developers.