I'm working on an optimization problem where I have a set of pairs that need to be assigned, each with associated costs.
I have been using scipy.optimize.linear_sum_assignment up to now.
I would now like to increase the complexity of the problem by trying to control the selected pairs with an additional constraint matrix. I want to still minimize the cost of the selected pairs, but now with the constraint that the average value of the selected pairs with respect to a second matrix is kept close to some threshold.
Here's an example using NumPy arrays:
import numpy as np
cost_matrix = np.array([[3.1, 3.0], [3.0, 3.1]])
secondary_matrix = np.array([[1.0, 10.0], [10.0, 1.0]])
# Define the threshold for the average value in secondary_matrix
average_threshold = 6.0
In this toy example, cost_matrix
is a square matrix (rarely ever the case) where costs[i][j] represents the cost of pairing pairs[i] and pairs[j]. secondary_matrix
is a matrix of the same shape, where importance_values[i][j] corresponds to the a secondary cost of pairing pairs[i] and pairs[j]. The average_threshold sets the maximum allowable average secondary cost value for selected pairs.
Using just scipy.optimize.linear
sum
assignment
would give pairs in [0,1] and [1,0], whereas while controlling for the secondary matrix would give the diagonal entries as optimal pairs. This is what I want to achieve in the the optimisation.
I'm seeking guidance on suitable optimization approaches or libraries that can handle this type of problem efficiently. So far, I have implemented this approach:
import numpy as np
from scipy.optimize import linear_sum_assignment, minimize
def match_minimize(cost_matrix, constraint_matrix, average_threshold):
def combined_objective(x):
row_indices, col_indices = linear_sum_assignment(cost_matrix + x.reshape(cost_matrix.shape))
total_cost_matrix = cost_matrix[row_indices, col_indices].sum()
selected_pairs_average = np.mean(secondary_matrix[row_indices, col_indices])
return total_cost_matrix + max(0, selected_pairs_average - average_threshold)
# Initial guess for the optimization variable x
x0 = np.zeros_like(cost_matrix).flatten()
# Solve the optimization problem
result = minimize(combined_objective, x0, method="COBYLA")
# Extract the optimal assignments
row_indices, col_indices = linear_sum_assignment(cost_matrix + result.x.reshape(cost_matrix.shape))
return row_indices, col_indices
In this approach, I add a penalty to the cost when the average of selected values of the secondary matrix is above the average threshold
It seems to work for small matrices, but for my problem I need to work on matrices with thousands of elements, and I get a python segfault when I have a matrix with > 255 column elements.
I am a beginner in optimisation and so am not sure if I'm on the right track or not.
Does my approach with match_minimize seem reasonable? Is there a way to get this to work with larger matrices? Or should I use a different approach?
All recommendations and advice greatly appreciated. Thanks
Pretty straightforward LP construction; though you need a weight parameter to decide how important mean error is with respect to primary assignment cost. Try changing mean_err_weight
in the following between 0.5 and 1.0 to observe the difference.
import numpy as np
from scipy.optimize import milp, LinearConstraint
import scipy.sparse
def mean_assignment(
asn_cost: np.ndarray,
mean_cost: np.ndarray,
target_mean: float,
mean_err_weight: float = 1,
) -> tuple[
np.ndarray, # assignments
float, # mean
float, # error from mean
]:
"""
Perform linear sum assignment with a secondary objective of minimizing deviation from a mean;
in other words,
minimize sum(asn_cost[assign]) + mean_err_weight*(sum(mean_cost[assign])/n - target_mean)
:param asn_cost: Assigment costs, 2D
:param mean_cost: Mean costs; must be same shape as assignment costs
:param target_mean: Number to which the mean of mean_cost[assign] will be pulled
:param mean_err_weight: Optimization objective weight for mean cost
:return: Assignment array of same shape as asn_cost; resulting mean of assigned mean_cost; and
resulting error from mean
"""
if asn_cost.shape != mean_cost.shape:
raise ValueError('Shape mismatch')
# Ensure a long matrix during optimization
transpose = asn_cost.shape[0] < asn_cost.shape[1]
if transpose:
asn_cost = asn_cost.T
mean_cost = mean_cost.T
m, n = asn_cost.shape
# Variables:
# m*n assignments, binary
# mean deviance, continuous
cost_coeff = np.concatenate((
asn_cost.ravel(), # Assignment costs
(mean_err_weight,), # Cost of error from mean
))
integrality = np.ones(m*n + 1) # Assignments are binary
integrality[-1] = 0 # Mean error is continuous
# There must be at most one assignment per row (Kronecker)
row_excl = LinearConstraint(
A=scipy.sparse.hstack(
(
scipy.sparse.kron(scipy.sparse.eye(m), np.ones((1, n))),
scipy.sparse.csc_array((m, 1)),
),
format='csc',
),
lb=np.zeros(m),
ub=np.ones(m),
)
# There must be exactly one assignment per column
col_excl = LinearConstraint(
A=scipy.sparse.hstack(
(scipy.sparse.eye(n),)*m +
(scipy.sparse.csc_array((n, 1)),),
format='csc',
),
lb=np.ones(n),
ub=np.ones(n),
)
# The error from mean is taken as the absolute
# err >= +sum(secondary[assign])/sum(assign) - target
# err >= -sum(secondary[assign])/sum(assign) + target
# cost_matrix*assign + ... - n*err <= n*target
# cost_matrix*assign + ... + n*err >= n*target
mean_err_abs = LinearConstraint(
A=scipy.sparse.bmat([
[mean_cost.ravel(), [-n]],
[mean_cost.ravel(), [+n]],
], format='csc'),
lb=[-np.inf, n*target_mean],
ub=[n*target_mean, +np.inf],
)
result = milp(
c=cost_coeff, integrality=integrality,
constraints=(row_excl, col_excl, mean_err_abs),
)
if not result.success:
raise ValueError(result.message)
assignments = result.x[:-1].reshape((m, n)).astype(int)
mean_err = result.x[-1]
mean = mean_cost[assignments.astype(bool)].mean()
if transpose:
assignments = assignments.T
return assignments, mean, mean_err
def test() -> None:
assign, mean, mean_err = mean_assignment(
asn_cost=np.array([
[5, 2],
[3, 4],
[6, 7],
]),
mean_cost=np.array([
[1., 10.],
[10., 1.],
[2., 2.],
]),
target_mean=6.,
mean_err_weight=1, # change to 0.5 to see cost tradeoff
)
print(assign)
print('mean =', mean)
print('mean err =', mean_err)
if __name__ == '__main__':
test()