I have a stacking workflow which is similar to
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import StackingClassifier
from sklearn.pipeline import make_pipeline
import xgboost as xgb
X = np.random.random(size=(1000, 5))
y = np.random.choice([0,1], 1000)
w = np.random.random(size=(1000,))
scaler = StandardScaler()
log_reg = LogisticRegression()
params = {
'n_estimators': 10,
'max_depth': 3,
'learning_rate': 0.1
}
log_reg_pipe = make_pipeline(
scaler,
log_reg
)
stack_pipe = make_pipeline(
StackingClassifier(
estimators=[('lr', lr_stack_pipe)],
final_estimator=xgb.XGBClassifier(**params),
passthrough=True,
cv=2
)
)
I'd like to be able to pass sample weights into the xgboost. My question is how do I set sample weights in the final estimator?
I have tried
stack_pipe.fit(X, y, sample_weights=w)
which throws
ValueError: Pipeline.fit does not accept the sample_weights parameter. You can pass parameters to specific steps of your pipeline using the stepname__parameter format, e.g. `Pipeline.fit(X, y, logisticregression__sample_weight=sample_weight)`
I also recently realized that stacking estimators can't handle sample-weighted Pipelines. I addressed the issue by subclassing the StackingRegressor
and StackingClassifier
classes from scikit-learn and overwriting its fit()
method to better manage Pipelines. Take a look at the following:
"""Implement StackingClassifier that can handle sample-weighted Pipelines."""
from sklearn.ensemble import StackingRegressor, StackingClassifier
from copy import deepcopy
import numpy as np
from joblib import Parallel
from sklearn.base import clone
from sklearn.base import is_classifier, is_regressor
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import check_cv
from sklearn.utils import Bunch
from sklearn.utils.fixes import delayed
from sklearn.pipeline import Pipeline
ESTIMATOR_NAME_IN_PIPELINE = 'estimator'
def new_fit_single_estimator(estimator, X, y, sample_weight=None,
message_clsname=None, message=None):
"""Private function used to fit an estimator within a job."""
if sample_weight is not None:
try:
if isinstance(estimator, Pipeline):
# determine name of final estimator
estimator_name = estimator.steps[-1][0]
kwargs = {estimator_name + '__sample_weight': sample_weight}
estimator.fit(X, y, **kwargs)
else:
estimator.fit(X, y, sample_weight=sample_weight)
except TypeError as exc:
if "unexpected keyword argument 'sample_weight'" in str(exc):
raise TypeError(
"Underlying estimator {} does not support sample weights."
.format(estimator.__class__.__name__)
) from exc
raise
else:
estimator.fit(X, y)
return estimator
class FlexibleStackingClassifier(StackingClassifier):
def __init__(self, estimators, final_estimator=None, *, cv=None,
n_jobs=None, passthrough=False, verbose=0):
super().__init__(
estimators=estimators,
final_estimator=final_estimator,
cv=cv,
n_jobs=n_jobs,
passthrough=passthrough,
verbose=verbose
)
def fit(self, X, y, sample_weight=None):
"""Fit the estimators.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training vectors, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : array-like of shape (n_samples,)
Target values.
sample_weight : array-like of shape (n_samples,) or default=None
Sample weights. If None, then samples are equally weighted.
Note that this is supported only if all underlying estimators
support sample weights.
.. versionchanged:: 0.23
when not None, `sample_weight` is passed to all underlying
estimators
Returns
-------
self : object
"""
# all_estimators contains all estimators, the one to be fitted and the
# 'drop' string.
names, all_estimators = self._validate_estimators()
self._validate_final_estimator()
stack_method = [self.stack_method] * len(all_estimators)
# Fit the base estimators on the whole training data. Those
# base estimators will be used in transform, predict, and
# predict_proba. They are exposed publicly.
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight)
for est in all_estimators if est != 'drop'
)
self.named_estimators_ = Bunch()
est_fitted_idx = 0
for name_est, org_est in zip(names, all_estimators):
if org_est != 'drop':
self.named_estimators_[name_est] = self.estimators_[
est_fitted_idx]
est_fitted_idx += 1
else:
self.named_estimators_[name_est] = 'drop'
# To train the meta-classifier using the most data as possible, we use
# a cross-validation to obtain the output of the stacked estimators.
# To ensure that the data provided to each estimator are the same, we
# need to set the random state of the cv if there is one and we need to
# take a copy.
cv = check_cv(self.cv, y=y, classifier=is_classifier(self))
if hasattr(cv, 'random_state') and cv.random_state is None:
cv.random_state = np.random.RandomState()
self.stack_method_ = [
self._method_name(name, est, meth)
for name, est, meth in zip(names, all_estimators, stack_method)
]
fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight}
if sample_weight is not None
else None)
predictions = Parallel(n_jobs=self.n_jobs)(
delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv),
method=meth, n_jobs=self.n_jobs,
fit_params=fit_params,
verbose=self.verbose)
for est, meth in zip(all_estimators, self.stack_method_)
if est != 'drop'
)
# Only not None or not 'drop' estimators will be used in transform.
# Remove the None from the method as well.
self.stack_method_ = [
meth for (meth, est) in zip(self.stack_method_, all_estimators)
if est != 'drop'
]
X_meta = self._concatenate_predictions(X, predictions)
new_fit_single_estimator(self.final_estimator_, X_meta, y,
sample_weight=sample_weight)
return self
class FlexibleStackingRegressor(StackingRegressor):
def __init__(self, estimators, final_estimator=None, *, cv=None,
n_jobs=None, passthrough=False, verbose=0):
super().__init__(
estimators=estimators,
final_estimator=final_estimator,
cv=cv,
n_jobs=n_jobs,
passthrough=passthrough,
verbose=verbose
)
def fit(self, X, y, sample_weight=None):
"""Fit the estimators.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training vectors, where `n_samples` is the number of samples and
`n_features` is the number of features.
y : array-like of shape (n_samples,)
Target values.
sample_weight : array-like of shape (n_samples,) or default=None
Sample weights. If None, then samples are equally weighted.
Note that this is supported only if all underlying estimators
support sample weights.
.. versionchanged:: 0.23
when not None, `sample_weight` is passed to all underlying
estimators
Returns
-------
self : object
"""
# all_estimators contains all estimators, the one to be fitted and the
# 'drop' string.
names, all_estimators = self._validate_estimators()
self._validate_final_estimator()
stack_method = [self.stack_method] * len(all_estimators)
# Fit the base estimators on the whole training data. Those
# base estimators will be used in transform, predict, and
# predict_proba. They are exposed publicly.
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(new_fit_single_estimator)(clone(est), X, y, sample_weight)
for est in all_estimators if est != 'drop'
)
self.named_estimators_ = Bunch()
est_fitted_idx = 0
for name_est, org_est in zip(names, all_estimators):
if org_est != 'drop':
self.named_estimators_[name_est] = self.estimators_[
est_fitted_idx]
est_fitted_idx += 1
else:
self.named_estimators_[name_est] = 'drop'
# To train the meta-classifier using the most data as possible, we use
# a cross-validation to obtain the output of the stacked estimators.
# To ensure that the data provided to each estimator are the same, we
# need to set the random state of the cv if there is one and we need to
# take a copy.
cv = check_cv(self.cv, y=y, classifier=is_classifier(self))
if hasattr(cv, 'random_state') and cv.random_state is None:
cv.random_state = np.random.RandomState()
self.stack_method_ = [
self._method_name(name, est, meth)
for name, est, meth in zip(names, all_estimators, stack_method)
]
fit_params = ({f"{ESTIMATOR_NAME_IN_PIPELINE}__sample_weight": sample_weight}
if sample_weight is not None
else None)
predictions = Parallel(n_jobs=self.n_jobs)(
delayed(cross_val_predict)(clone(est), X, y, cv=deepcopy(cv),
method=meth, n_jobs=self.n_jobs,
fit_params=fit_params,
verbose=self.verbose)
for est, meth in zip(all_estimators, self.stack_method_)
if est != 'drop'
)
# Only not None or not 'drop' estimators will be used in transform.
# Remove the None from the method as well.
self.stack_method_ = [
meth for (meth, est) in zip(self.stack_method_, all_estimators)
if est != 'drop'
]
X_meta = self._concatenate_predictions(X, predictions)
new_fit_single_estimator(self.final_estimator_, X_meta, y,
sample_weight=sample_weight)
return self
I included both the Regressor and Classifier versions, though you only seem to need to be able to use the Classifier subclass.
But a word of warning: you must give your estimators the same name in your pipelines, and that name must align with the ESTIMATOR_NAME_IN_PIPELINE
field defined below. Otherwise the code won't work. For example, here would be an appropriately defined Pipeline
instance using the same name as defined in the class definition script shown above:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import TweedieRegressor
from sklearn.feature_selection import VarianceThreshold
validly_named_pipeline = Pipeline([
('variance_threshold', VarianceThreshold()),
('scaler', StandardScaler()),
('estimator', TweedieRegressor())
])
This isn't ideal, but it's what I have for now and should work regardless.
Edit: Just to be clear, when I was overwriting the fit()
method I just copied and pasted the code from the scikit repository and made the necessary changes, which constituted only a few lines. So much of the pasted code is not my original work, but that of the scikit developers.