Search code examples
pythonnumpy

Fast calculation of Pareto front in Python


I have a set of points in a 3D space, from which I need to find the Pareto frontier. Speed of execution is very important here, and time increases very fast as I add points to test.

The set of points looks like this:

[[0.3296170319979843, 0.0, 0.44472108843537406], [0.3296170319979843,0.0, 0.44472108843537406], [0.32920760896951373, 0.0, 0.4440408163265306], [0.32920760896951373, 0.0, 0.4440408163265306], [0.33815192743764166, 0.0, 0.44356462585034007]]

Right now, I'm using this algorithm:

def dominates(row, candidateRow):
    return sum([row[x] >= candidateRow[x] for x in range(len(row))]) == len(row) 

def simple_cull(inputPoints, dominates):
    paretoPoints = set()
    candidateRowNr = 0
    dominatedPoints = set()
    while True:
        candidateRow = inputPoints[candidateRowNr]
        inputPoints.remove(candidateRow)
        rowNr = 0
        nonDominated = True
        while len(inputPoints) != 0 and rowNr < len(inputPoints):
            row = inputPoints[rowNr]
            if dominates(candidateRow, row):
                # If it is worse on all features remove the row from the array
                inputPoints.remove(row)
                dominatedPoints.add(tuple(row))
            elif dominates(row, candidateRow):
                nonDominated = False
                dominatedPoints.add(tuple(candidateRow))
                rowNr += 1
            else:
                rowNr += 1

        if nonDominated:
            # add the non-dominated point to the Pareto frontier
            paretoPoints.add(tuple(candidateRow))

        if len(inputPoints) == 0:
            break
    return paretoPoints, dominatedPoints

Found here: http://code.activestate.com/recipes/578287-multidimensional-pareto-front/

What's the fastest way to find the set of non-dominated solutions in an ensemble of solutions? Or, in short, can Python do better than this algorithm?


Solution

  • If you're worried about actual speed, you definitely want to use numpy (as the clever algorithmic tweaks probably have way less effect than the gains to be had from using array operations). Here are three solutions that all compute the same function. The is_pareto_efficient_dumb solution is slower in most situations but becomes faster as the number of costs increases, the is_pareto_efficient_simple solution is much more efficient than the dumb solution for many points, and the final is_pareto_efficient function is less readable but the fastest (so all are Pareto Efficient!).

    import numpy as np
    
    
    # Very slow for many datapoints.  Fastest for many costs, most readable
    def is_pareto_efficient_dumb(costs):
        """
        Find the pareto-efficient points
        :param costs: An (n_points, n_costs) array
        :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
        """
        is_efficient = np.ones(costs.shape[0], dtype = bool)
        for i, c in enumerate(costs):
            is_efficient[i] = np.all(np.any(costs[:i]>c, axis=1)) and np.all(np.any(costs[i+1:]>c, axis=1))
        return is_efficient
    
    
    # Fairly fast for many datapoints, less fast for many costs, somewhat readable
    def is_pareto_efficient_simple(costs):
        """
        Find the pareto-efficient points
        :param costs: An (n_points, n_costs) array
        :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
        """
        is_efficient = np.ones(costs.shape[0], dtype = bool)
        for i, c in enumerate(costs):
            if is_efficient[i]:
                is_efficient[is_efficient] = np.any(costs[is_efficient]<c, axis=1)  # Keep any point with a lower cost
                is_efficient[i] = True  # And keep self
        return is_efficient
    
    
    # Faster than is_pareto_efficient_simple, but less readable.
    def is_pareto_efficient(costs, return_mask = True):
        """
        Find the pareto-efficient points
        :param costs: An (n_points, n_costs) array
        :param return_mask: True to return a mask
        :return: An array of indices of pareto-efficient points.
            If return_mask is True, this will be an (n_points, ) boolean array
            Otherwise it will be a (n_efficient_points, ) integer array of indices.
        """
        is_efficient = np.arange(costs.shape[0])
        n_points = costs.shape[0]
        next_point_index = 0  # Next index in the is_efficient array to search for
        while next_point_index<len(costs):
            nondominated_point_mask = np.any(costs<costs[next_point_index], axis=1)
            nondominated_point_mask[next_point_index] = True
            is_efficient = is_efficient[nondominated_point_mask]  # Remove dominated points
            costs = costs[nondominated_point_mask]
            next_point_index = np.sum(nondominated_point_mask[:next_point_index])+1
        if return_mask:
            is_efficient_mask = np.zeros(n_points, dtype = bool)
            is_efficient_mask[is_efficient] = True
            return is_efficient_mask
        else:
            return is_efficient
    

    Profiling tests (using points drawn from a Normal distribution):

    With 10,000 samples, 2 costs:

    is_pareto_efficient_dumb: Elapsed time is 1.586s
    is_pareto_efficient_simple: Elapsed time is 0.009653s
    is_pareto_efficient: Elapsed time is 0.005479s
    

    With 1,000,000 samples, 2 costs:

    is_pareto_efficient_dumb: Really, really, slow
    is_pareto_efficient_simple: Elapsed time is 1.174s
    is_pareto_efficient: Elapsed time is 0.4033s
    

    With 10,000 samples, 15 costs:

    is_pareto_efficient_dumb: Elapsed time is 4.019s
    is_pareto_efficient_simple: Elapsed time is 6.466s
    is_pareto_efficient: Elapsed time is 6.41s
    

    Note that if efficiency is a concern you can gain maybe a further 2x or so speedup by reordering your data beforehand, see here.