Search code examples
pythonnumpyscikit-learndecision-treescoring

How to create a customized scoring function in scikit-learn for scoring a set of instances based on their individual properties?


I'm trying to perform GridSearchCV to optimize hyperparameters of my classifier, this should be done by optimizing a custom scoring-function. The problem is, that the scoring-function is assigned on a certain cost, that is different each instance (the cost is also a feature of each instance). Like shown in the example below, another array test_amt is needed that holds the cost of each instance (in addition to the 'normal' scoring function that just gets y and y_pred.

    def calculate_costs(y_test, y_test_pred, test_amt):
        cost = 0

        for i in range(1, len(y_test)):
            y = y_test.iloc[i]
            y_pred = y_test_pred.iloc[i]
            x_amt = test_amt.iloc[i]

            if y == 0 and y_pred == 0:
                cost -= x_amt * 1.1
            elif y == 0 and y_pred == 1:
                cost += x_amt
            elif y == 1 and y_pred == 0:
                cost += x_amt * 1.1
            elif y == 1 and y_pred == 1:
                cost += 0
            else:
                print("ERROR! No cost could be assigned to the instance: " + str(i))
        return cost

When I call this function after training with the three arrays, it perfectly calculates the total cost that results from a model. However integrating this into GridSearchCV is difficult, because the scoring function only expects two parameters. While there is the possibility to pass additional kwargs to the scorer, I have no clue on how to pass a subset that is dependent on the split that GridSearchCV is currently working on.

What I have thought of / tried so far:

  1. Wrapping the whole pipeline in a class with a globally stored pandas.Series object that stores the cost of each instance with an index. Then, it would theoretically be possible to reference the cost of an instance by calling it with the same index. Unfortunately, this does not work as scikit-learn transforms everything into a numpy array.

    def calculate_costs_class(y_test, y_test_pred):
        cost = 0
        for index, _ in y_test.iteritems():
            y = y_test.loc[index]
            y_pred = y_test_pred.loc[index]
            x_amt = self.test_amt.loc[index]
    
            if y == 0 and y_pred == 0:
                cost += (x_amt * (-1)) + 5 + (x_amt * 0.1)  # -revenue, +shipping, +fees
            elif y == 0 and y_pred == 1:
                cost += x_amt  # +revenue
            elif y == 1 and y_pred == 0:
                cost += x_amt + 5 + (x_amt * 0.1) + 5  # +revenue, +shipping, +fees, +charge cost
            elif y == 1 and y_pred == 1:
                cost += 0  # nothing
            else:
                print("ERROR! No cost could be assigned to the instance: " + str(index))
        return cost
    
  2. Creating a custom PseudoInt class, that is the data type of the label, which inherits all properties from int but is also able to store the cost of an instance (while retaining all its properties for applying logical operations). While even this would work outside of Scikit Learn, the check_classification_targets method in scikit learn raises a ValueError: Unknown label type: 'unknown' error.

    class PseudoInt(int):
        def __new__(cls, x, cost, *args, **kwargs):
            instance = int.__new__(cls, x, *args, **kwargs)
            instance.cost = cost
            return instance
    
  3. I haven't tried but thought of: Since the cost is also a feature in the instance set X, it is also available in the __call__ method of _PredictScorer(_BaseScorer) class in Scikit's scorer.py. If I reprogram the call function to also pass the cost array as a subset of X to the score_func I would also have the cost.

  4. Or: I could just implement everything myself.

Is there an "easier" solution?


Solution

  • I found a way to solve the problem by going the path of the 2nd proposed answer: Passing a PseudoInteger to Scikit-Learn that has all the same properties as a normal integer when compared or done mathematical operations with. However, it also acts as a wrapper for the int, and instance variables (such as the cost of an instance) can also be stored. As already stated in the question, this causes Scikit-learn to recognize that the values inside the passed label array are in fact of type object rather than int. So I just replaced the test in the type_of_target(y) method of Scikit-Learn's multiclass.py in line 273 to return 'binary' even though it doesn't pass the test. So that Scikit-Learn just treats the whole problem (as it should be) like a binary classification problem. So line 269-273 in the type_of_target(y) method in multiclass.py now looks like:

    # Invalid inputs
    if y.ndim > 2 or (y.dtype == object and len(y) and
                      not isinstance(y.flat[0], string_types)):
        # return 'unknown'  # [[[1, 2]]] or [obj_1] and not ["label_1"]
        return 'binary' # Sneaky, modified to force binary classification.
    

    My code then looks like this:

    import sklearn
    import sklearn.model_selection
    import sklearn.base
    import sklearn.metrics
    import numpy as np
    import sklearn.tree
    import sklearn.feature_selection
    from sklearn.model_selection import GridSearchCV
    from sklearn.pipeline import Pipeline
    from sklearn.metrics.scorer import make_scorer
    
    
    class PseudoInt(int):
        # Behaves like an integer, but is able to store instance variables
        pass
    
    
    def grid_search(x, y_normal, x_amounts):
        # Change the label set to a np array containing pseudo ints with the costs associated with the instances
        y = np.empty(len(y_normal), dtype=PseudoInt)
        for index, value in y_normal.iteritems():
            new_int = PseudoInt(value)
            new_int.cost = x_amounts.loc[index]  # Here the cost is added to the label
            y[index] = new_int
    
        # Normal train test split
        x_train, x_test, y_train, y_test = sklearn.model_selection.train_test_split(x, y, test_size=0.2)
    
        # Classifier
        clf = sklearn.tree.DecisionTreeClassifier()
    
        # Custom scorer with the cost function below (lower cost is better)
        cost_scorer = make_scorer(cost_function, greater_is_better=False)  # Custom cost function (Lower cost is better)
    
        # Define pipeline
        pipe = Pipeline([('clf', clf)])
    
        # Grid search grid with any hyper parameters or other settings
        param_grid = [
            {'sfs__estimator__criterion': ['gini', 'entropy']}
        ]
    
        # Grid search and pass the custom scorer function
        gs = GridSearchCV(estimator=pipe,
                          param_grid=param_grid,
                          scoring=cost_scorer,
                          n_jobs=1,
                          cv=5,
                          refit=True)
    
        # run grid search and refit with best hyper parameters
        gs = gs.fit(x_train.as_matrix(), y_train)
        print("Best Parameters: " + str(gs.best_params_))
        print('Best Accuracy: ' + str(gs.best_score_))
    
        # Predict with retrained model (with best parameters)
        y_test_pred = gs.predict(x_test.as_matrix())
    
        # Get scores (also cost score)
        get_scores(y_test, y_test_pred)
    
    
    def get_scores(y_test, y_test_pred):
        print("Getting scores")
    
        print("SCORES")
        precision = sklearn.metrics.precision_score(y_test, y_test_pred)
        recall = sklearn.metrics.recall_score(y_test, y_test_pred)
        f1_score = sklearn.metrics.f1_score(y_test, y_test_pred)
        accuracy = sklearn.metrics.accuracy_score(y_test, y_test_pred)
        print("Precision      " + str(precision))
        print("Recall         " + str(recall))
        print("Accuracy       " + str(accuracy))
        print("F1_Score       " + str(f1_score))
    
        print("COST")
        cost = cost_function(y_test, y_test_pred)
        print("Cost Savings   " + str(-cost))
    
        print("CONFUSION MATRIX")
        cnf_matrix = sklearn.metrics.confusion_matrix(y_test, y_test_pred)
        cnf_matrix = cnf_matrix.astype('float') / cnf_matrix.sum(axis=1)[:, np.newaxis]
        print(cnf_matrix)
    
    
    def cost_function(y_test, y_test_pred):
        """
        Calculates total cost based on TP, FP, TN, FN and the cost of a certain instance
        :param y_test: Has to be an array of PseudoInts containing the cost of each instance
        :param y_test_pred: Any array of PseudoInts or ints
        :return: Returns total cost
        """
        cost = 0
    
        for index in range(len(y_test)):
            # print(index)
            y = y_test[index]
            y_pred = y_test_pred[index]
            x_amt = y.cost
    
            if y == 0 and y_pred == 0:
                cost -= x_amt # Reducing cot by x_amt
            elif y == 0 and y_pred == 1:
                cost += x_amt  # Wrong classification adds cost
            elif y == 1 and y_pred == 0:
                cost += x_amt + 5 # Wrong classification adds cost and fee
            elif y == 1 and y_pred == 1:
                cost += 0  # No cost
            else:
                raise ValueError("No cost could be assigned to the instance: " + str(index))
    
        # print("Cost: " + str(cost))
        return cost
    

    UPDATE

    Instead of changing the files in the package directly (which is kind of dirty), I now added to the first import lines of my project:

    import sklearn.utils.multiclass
    
    def return_binary(y):
        return "binary"
    
    sklearn.utils.multiclass.type_of_target = return_binary
    

    This overwrites the type_of_tartget(y) method in sklearn.utils.multiclass to always return binary. Note that his has to be in front of all the other sklearn-imports.