I'm trying to perform GridSearchCV
to optimize hyperparameters of my classifier, this should be done by optimizing a custom scoring-function. The problem is, that the scoring-function is assigned on a certain cost, that is different each instance (the cost is also a feature of each instance). Like shown in the example below, another array test_amt
is needed that holds the cost of each instance (in addition to the 'normal' scoring function that just gets y
and y_pred
.
def calculate_costs(y_test, y_test_pred, test_amt):
cost = 0
for i in range(1, len(y_test)):
y = y_test.iloc[i]
y_pred = y_test_pred.iloc[i]
x_amt = test_amt.iloc[i]
if y == 0 and y_pred == 0:
cost -= x_amt * 1.1
elif y == 0 and y_pred == 1:
cost += x_amt
elif y == 1 and y_pred == 0:
cost += x_amt * 1.1
elif y == 1 and y_pred == 1:
cost += 0
else:
print("ERROR! No cost could be assigned to the instance: " + str(i))
return cost
When I call this function after training with the three arrays, it perfectly calculates the total cost that results from a model. However integrating this into GridSearchCV is difficult, because the scoring function only expects two parameters. While there is the possibility to pass additional kwargs to the scorer, I have no clue on how to pass a subset that is dependent on the split that GridSearchCV is currently working on.
What I have thought of / tried so far:
Wrapping the whole pipeline in a class with a globally stored pandas.Series
object that stores the cost of each instance with an index. Then, it would theoretically be possible to reference the cost of an instance by calling it with the same index. Unfortunately, this does not work as scikit-learn transforms everything into a numpy array.
def calculate_costs_class(y_test, y_test_pred):
cost = 0
for index, _ in y_test.iteritems():
y = y_test.loc[index]
y_pred = y_test_pred.loc[index]
x_amt = self.test_amt.loc[index]
if y == 0 and y_pred == 0:
cost += (x_amt * (-1)) + 5 + (x_amt * 0.1) # -revenue, +shipping, +fees
elif y == 0 and y_pred == 1:
cost += x_amt # +revenue
elif y == 1 and y_pred == 0:
cost += x_amt + 5 + (x_amt * 0.1) + 5 # +revenue, +shipping, +fees, +charge cost
elif y == 1 and y_pred == 1:
cost += 0 # nothing
else:
print("ERROR! No cost could be assigned to the instance: " + str(index))
return cost
Creating a custom PseudoInt class, that is the data type of the label, which inherits all properties from int but is also able to store the cost of an instance (while retaining all its properties for applying logical operations). While even this would work outside of Scikit Learn, the check_classification_targets method in scikit learn raises a ValueError: Unknown label type: 'unknown' error.
class PseudoInt(int):
def __new__(cls, x, cost, *args, **kwargs):
instance = int.__new__(cls, x, *args, **kwargs)
instance.cost = cost
return instance
I haven't tried but thought of: Since the cost is also a feature in the instance set X
, it is also available in the __call__
method of _PredictScorer(_BaseScorer)
class in Scikit's scorer.py
. If I reprogram the call function to also pass the cost array as a subset of X
to the score_func
I would also have the cost.
Or: I could just implement everything myself.
Is there an "easier" solution?
I found a way to solve the problem by going the path of the 2nd proposed answer: Passing a PseudoInteger to Scikit-Learn that has all the same properties as a normal integer when compared or done mathematical operations with. However, it also acts as a wrapper for the int, and instance variables (such as the cost of an instance) can also be stored. As already stated in the question, this causes Scikit-learn to recognize that the values inside the passed label array are in fact of type object rather than int. So I just replaced the test in the type_of_target(y) method of Scikit-Learn's multiclass.py in line 273 to return 'binary' even though it doesn't pass the test. So that Scikit-Learn just treats the whole problem (as it should be) like a binary classification problem. So line 269-273 in the type_of_target(y) method in multiclass.py now looks like:
# Invalid inputs
if y.ndim > 2 or (y.dtype == object and len(y) and
not isinstance(y.flat[0], string_types)):
# return 'unknown' # [[[1, 2]]] or [obj_1] and not ["label_1"]
return 'binary' # Sneaky, modified to force binary classification.
My code then looks like this:
import sklearn
import sklearn.model_selection
import sklearn.base
import sklearn.metrics
import numpy as np
import sklearn.tree
import sklearn.feature_selection
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.metrics.scorer import make_scorer
class PseudoInt(int):
# Behaves like an integer, but is able to store instance variables
pass
def grid_search(x, y_normal, x_amounts):
# Change the label set to a np array containing pseudo ints with the costs associated with the instances
y = np.empty(len(y_normal), dtype=PseudoInt)
for index, value in y_normal.iteritems():
new_int = PseudoInt(value)
new_int.cost = x_amounts.loc[index] # Here the cost is added to the label
y[index] = new_int
# Normal train test split
x_train, x_test, y_train, y_test = sklearn.model_selection.train_test_split(x, y, test_size=0.2)
# Classifier
clf = sklearn.tree.DecisionTreeClassifier()
# Custom scorer with the cost function below (lower cost is better)
cost_scorer = make_scorer(cost_function, greater_is_better=False) # Custom cost function (Lower cost is better)
# Define pipeline
pipe = Pipeline([('clf', clf)])
# Grid search grid with any hyper parameters or other settings
param_grid = [
{'sfs__estimator__criterion': ['gini', 'entropy']}
]
# Grid search and pass the custom scorer function
gs = GridSearchCV(estimator=pipe,
param_grid=param_grid,
scoring=cost_scorer,
n_jobs=1,
cv=5,
refit=True)
# run grid search and refit with best hyper parameters
gs = gs.fit(x_train.as_matrix(), y_train)
print("Best Parameters: " + str(gs.best_params_))
print('Best Accuracy: ' + str(gs.best_score_))
# Predict with retrained model (with best parameters)
y_test_pred = gs.predict(x_test.as_matrix())
# Get scores (also cost score)
get_scores(y_test, y_test_pred)
def get_scores(y_test, y_test_pred):
print("Getting scores")
print("SCORES")
precision = sklearn.metrics.precision_score(y_test, y_test_pred)
recall = sklearn.metrics.recall_score(y_test, y_test_pred)
f1_score = sklearn.metrics.f1_score(y_test, y_test_pred)
accuracy = sklearn.metrics.accuracy_score(y_test, y_test_pred)
print("Precision " + str(precision))
print("Recall " + str(recall))
print("Accuracy " + str(accuracy))
print("F1_Score " + str(f1_score))
print("COST")
cost = cost_function(y_test, y_test_pred)
print("Cost Savings " + str(-cost))
print("CONFUSION MATRIX")
cnf_matrix = sklearn.metrics.confusion_matrix(y_test, y_test_pred)
cnf_matrix = cnf_matrix.astype('float') / cnf_matrix.sum(axis=1)[:, np.newaxis]
print(cnf_matrix)
def cost_function(y_test, y_test_pred):
"""
Calculates total cost based on TP, FP, TN, FN and the cost of a certain instance
:param y_test: Has to be an array of PseudoInts containing the cost of each instance
:param y_test_pred: Any array of PseudoInts or ints
:return: Returns total cost
"""
cost = 0
for index in range(len(y_test)):
# print(index)
y = y_test[index]
y_pred = y_test_pred[index]
x_amt = y.cost
if y == 0 and y_pred == 0:
cost -= x_amt # Reducing cot by x_amt
elif y == 0 and y_pred == 1:
cost += x_amt # Wrong classification adds cost
elif y == 1 and y_pred == 0:
cost += x_amt + 5 # Wrong classification adds cost and fee
elif y == 1 and y_pred == 1:
cost += 0 # No cost
else:
raise ValueError("No cost could be assigned to the instance: " + str(index))
# print("Cost: " + str(cost))
return cost
Instead of changing the files in the package directly (which is kind of dirty), I now added to the first import lines of my project:
import sklearn.utils.multiclass
def return_binary(y):
return "binary"
sklearn.utils.multiclass.type_of_target = return_binary
This overwrites the type_of_tartget(y) method in sklearn.utils.multiclass to always return binary. Note that his has to be in front of all the other sklearn-imports.