Search code examples
optimizationscipylinear-programming

Using Scipy to Find Optimal Pairs of a Cost Matrix with an additional Constraint Matrix


I'm working on an optimization problem where I have a set of pairs that need to be assigned, each with associated costs.

I have been using scipy.optimize.linear_sum_assignment up to now.

I would now like to increase the complexity of the problem by trying to control the selected pairs with an additional constraint matrix. I want to still minimize the cost of the selected pairs, but now with the constraint that the average value of the selected pairs with respect to a second matrix is kept close to some threshold.

Here's an example using NumPy arrays:


import numpy as np

cost_matrix = np.array([[3.1, 3.0], [3.0, 3.1]])

secondary_matrix = np.array([[1.0, 10.0], [10.0, 1.0]])

# Define the threshold for the average value in secondary_matrix
average_threshold = 6.0

In this toy example, cost_matrix is a square matrix (rarely ever the case) where costs[i][j] represents the cost of pairing pairs[i] and pairs[j]. secondary_matrix is a matrix of the same shape, where importance_values[i][j] corresponds to the a secondary cost of pairing pairs[i] and pairs[j]. The average_threshold sets the maximum allowable average secondary cost value for selected pairs.

Using just scipy.optimize.linearsumassignment would give pairs in [0,1] and [1,0], whereas while controlling for the secondary matrix would give the diagonal entries as optimal pairs. This is what I want to achieve in the the optimisation.

I'm seeking guidance on suitable optimization approaches or libraries that can handle this type of problem efficiently. So far, I have implemented this approach:

import numpy as np
from scipy.optimize import linear_sum_assignment, minimize


def match_minimize(cost_matrix, constraint_matrix, average_threshold):
    def combined_objective(x):
        row_indices, col_indices = linear_sum_assignment(cost_matrix + x.reshape(cost_matrix.shape))
        total_cost_matrix = cost_matrix[row_indices, col_indices].sum()
        selected_pairs_average = np.mean(secondary_matrix[row_indices, col_indices])
        return total_cost_matrix + max(0, selected_pairs_average - average_threshold)

    # Initial guess for the optimization variable x
    x0 = np.zeros_like(cost_matrix).flatten()

    # Solve the optimization problem
    result = minimize(combined_objective, x0, method="COBYLA")

    # Extract the optimal assignments
    row_indices, col_indices = linear_sum_assignment(cost_matrix + result.x.reshape(cost_matrix.shape))
    return row_indices, col_indices

In this approach, I add a penalty to the cost when the average of selected values of the secondary matrix is above the average threshold

It seems to work for small matrices, but for my problem I need to work on matrices with thousands of elements, and I get a python segfault when I have a matrix with > 255 column elements.

I am a beginner in optimisation and so am not sure if I'm on the right track or not.

Does my approach with match_minimize seem reasonable? Is there a way to get this to work with larger matrices? Or should I use a different approach?

All recommendations and advice greatly appreciated. Thanks


Solution

  • Pretty straightforward LP construction; though you need a weight parameter to decide how important mean error is with respect to primary assignment cost. Try changing mean_err_weight in the following between 0.5 and 1.0 to observe the difference.

    import numpy as np
    from scipy.optimize import milp, LinearConstraint
    import scipy.sparse
    
    
    def mean_assignment(
        asn_cost: np.ndarray,
        mean_cost: np.ndarray,
        target_mean: float,
        mean_err_weight: float = 1,
    ) -> tuple[
        np.ndarray,  # assignments
        float,  # mean
        float,  # error from mean
    ]:
        """
        Perform linear sum assignment with a secondary objective of minimizing deviation from a mean;
        in other words,
        minimize sum(asn_cost[assign]) + mean_err_weight*(sum(mean_cost[assign])/n - target_mean)
    
        :param asn_cost: Assigment costs, 2D
        :param mean_cost: Mean costs; must be same shape as assignment costs
        :param target_mean: Number to which the mean of mean_cost[assign] will be pulled
        :param mean_err_weight: Optimization objective weight for mean cost
        :return: Assignment array of same shape as asn_cost; resulting mean of assigned mean_cost; and
            resulting error from mean
        """
    
        if asn_cost.shape != mean_cost.shape:
            raise ValueError('Shape mismatch')
    
        # Ensure a long matrix during optimization
        transpose = asn_cost.shape[0] < asn_cost.shape[1]
        if transpose:
            asn_cost = asn_cost.T
            mean_cost = mean_cost.T
    
        m, n = asn_cost.shape
    
        # Variables:
        #    m*n assignments, binary
        #    mean deviance, continuous
    
        cost_coeff = np.concatenate((
            asn_cost.ravel(),    # Assignment costs
            (mean_err_weight,),  # Cost of error from mean
        ))
        integrality = np.ones(m*n + 1)  # Assignments are binary
        integrality[-1] = 0             # Mean error is continuous
    
        # There must be at most one assignment per row (Kronecker)
        row_excl = LinearConstraint(
            A=scipy.sparse.hstack(
                (
                    scipy.sparse.kron(scipy.sparse.eye(m), np.ones((1, n))),
                    scipy.sparse.csc_array((m, 1)),
                ),
                format='csc',
            ),
            lb=np.zeros(m),
            ub=np.ones(m),
        )
    
        # There must be exactly one assignment per column
        col_excl = LinearConstraint(
            A=scipy.sparse.hstack(
                (scipy.sparse.eye(n),)*m +
                (scipy.sparse.csc_array((n, 1)),),
                format='csc',
            ),
            lb=np.ones(n),
            ub=np.ones(n),
        )
    
        # The error from mean is taken as the absolute
        # err >= +sum(secondary[assign])/sum(assign) - target
        # err >= -sum(secondary[assign])/sum(assign) + target
        # cost_matrix*assign + ... - n*err <= n*target
        # cost_matrix*assign + ... + n*err >= n*target
        mean_err_abs = LinearConstraint(
            A=scipy.sparse.bmat([
                [mean_cost.ravel(), [-n]],
                [mean_cost.ravel(), [+n]],
            ], format='csc'),
            lb=[-np.inf, n*target_mean],
            ub=[n*target_mean, +np.inf],
        )
    
        result = milp(
            c=cost_coeff, integrality=integrality,
            constraints=(row_excl, col_excl, mean_err_abs),
        )
        if not result.success:
            raise ValueError(result.message)
        assignments = result.x[:-1].reshape((m, n)).astype(int)
        mean_err = result.x[-1]
        mean = mean_cost[assignments.astype(bool)].mean()
        if transpose:
            assignments = assignments.T
        return assignments, mean, mean_err
    
    
    def test() -> None:
        assign, mean, mean_err = mean_assignment(
            asn_cost=np.array([
                [5, 2],
                [3, 4],
                [6, 7],
            ]),
            mean_cost=np.array([
                [1., 10.],
                [10., 1.],
                [2.,  2.],
            ]),
            target_mean=6.,
            mean_err_weight=1,  # change to 0.5 to see cost tradeoff
        )
    
        print(assign)
        print('mean =', mean)
        print('mean err =', mean_err)
    
    
    if __name__ == '__main__':
        test()