Search code examples
pythonsearchdijkstrashortest-path

Yen's Algorithm implementation not choosing shortest paths


I am using Yen's Algorithm (Wikipedia) to find k shortest paths in a graph. In the example below, my graph is a dictionary where each node is a key, with its value being the neighbors. Map() from dotmap simply allows for dictionaries to be converted into an object where keys can be accessed with dot notation. I want to find the four shortest paths in descending order from A to F where every edge has equal weight. The first two are ties (A > B > D > F) and (A > E > D > F), and the next two are (A > B > C > G > F) and finally (A > B > D > C > G > F). It is possible that my implementation of Dijkstra's (called AStar despite having no heuristic) is flawed because it is returning an empty list when no path is found. How can I have my code only pick the valid paths? Currently it returns [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], [], []] -- it should return [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']] which are the shortest paths.

import copy
import heapq
from dotmap import Map
from itertools import count

graph = {
    'A': ['B', 'E'],
    'B': ['C', 'D'],
    'C': ['G'],
    'D': ['C', 'F'],
    'E': ['D'],
    'F': [],
    'G': ['F']
}

class PriorityQueue:
    def __init__(self):
        self.elements = []
        self._counter = count()

    def empty(self):
        return len(self.elements) == 0

    def put(self, item, priority):
        heapq.heappush(self.elements, (priority, item,))

    def get(self):
        return heapq.heappop(self.elements)[1]


class AStar:
    def __init__(self, graph, start, goals=[]):
        self.graph = graph
        self.start = start
        self.frontier = PriorityQueue()
        self.frontier.put(start, 0)
        self.previous = {}
        self.previous[start] = None
        self.costs = {}
        self.costs[start] = 0
        self.final_path = None
        self.goals = goals
        self.goal = None

    def search(self):
        graph = self.graph
        frontier = self.frontier
        goals = self.goals
        costs = self.costs
        while not frontier.empty():
            state = frontier.get()
            if state in goals:
                cost = self.costs[state]
                self.goal = state
                self.final_path = self.trace_path()
                return Map({'path': self.final_path, 'cost': cost})

            for next_state in graph[state]:
                new_cost = costs[state] + 1
                if next_state not in costs or new_cost < costs[next_state]:
                    costs[next_state] = new_cost
                    priority = new_cost
                    frontier.put(next_state, priority)
                    self.previous[next_state] = state
        # No path found
        return Map({'path': [], 'cost': 0})

    def trace_path(self):
        current = self.goal
        path = []
        while current != self.start:
            path.append(current)
            current = self.previous[current]
        path.append(self.start)
        path.reverse()
        return path

def YenKSP(graph, source, sink, k_paths):   
    graph_clone = copy.deepcopy(graph)
    A = [AStar(graph, source, sink).search().path]
    B = []

    for k in range(1, k_paths):
        for i in range(len(A[-1]) - 1):
            spur_node = A[-1][i]
            root_path = A[-1][:i+1]
            for path in A:
                if len(path) > i and root_path == path[:i+1]:
                    graph_clone[path[i]].remove(path[i+1])

            result = AStar(graph_clone, spur_node, sink).search()
            spur_path = result.path
            total_path = root_path[:-1] + spur_path
            spur_cost = AStar(graph_clone, source, spur_node).search().cost
            B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
            graph_clone = copy.deepcopy(graph)
        if len(B) == 0:
            break
        B.sort(key=lambda p: (p.cost, len(p.path)))
        A.append(B[0].path)
        B.pop()
    return A

paths = YenKSP(graph, 'A', 'F', 4)
print(paths)

Solution

  • import copy
    import heapq
    #from dotmap import Map
    from itertools import count
    
    class Map(dict):
        def __getattr__(self, k):
            return self[k]
        def __setattr__(self, k, v):
            self[k] = v
    
    graph = {
        'A': ['B', 'E'],
        'B': ['C', 'D'],
        'C': ['G'],
        'D': ['C', 'F'],
        'E': ['D'],
        'F': [],
        'G': ['F']
    }
    
    class PriorityQueue:
        def __init__(self):
            self.elements = []
            self._counter = count()
    
        def empty(self):
            return len(self.elements) == 0
    
        def put(self, item, priority):
            heapq.heappush(self.elements, (priority, item,))
    
        def get(self):
            return heapq.heappop(self.elements)[1]
    
    
    class AStar:
        def __init__(self, graph, start, goals=[]):
            self.graph = graph
            self.start = start
            self.frontier = PriorityQueue()
            self.frontier.put(start, 0)
            self.previous = {}
            self.previous[start] = None
            self.costs = {}
            self.costs[start] = 0
            self.final_path = None
            self.goals = goals
            self.goal = None
    
        def search(self):
            graph = self.graph
            frontier = self.frontier
            goals = self.goals
            costs = self.costs
            while not frontier.empty():
                state = frontier.get()
                if state in goals:
                    cost = self.costs[state]
                    self.goal = state
                    self.final_path = self.trace_path()
                    return Map({'path': self.final_path, 'cost': cost})
    
                for next_state in graph[state]:
                    new_cost = costs[state] + 1
                    if next_state not in costs or new_cost < costs[next_state]:
                        costs[next_state] = new_cost
                        priority = new_cost
                        frontier.put(next_state, priority)
                        self.previous[next_state] = state
            # No path found
            return Map({'path': [], 'cost': float('inf')})
    
        def trace_path(self):
            current = self.goal
            path = []
            while current != self.start:
                path.append(current)
                current = self.previous[current]
            path.append(self.start)
            path.reverse()
            return path
    
    def YenKSP(graph, source, sink, k_paths):
        A = [AStar(graph, source, sink).search().path]
        B = []
    
        for _ in range(1, k_paths):
            for i in range(len(A[-1]) - 1):
                graph_clone = copy.deepcopy(graph)
    
                spur_node = A[-1][i]
                root_path = A[-1][:i+1]
                for path in A:
                    if len(path) > i and root_path == path[:i+1]:
                        if path[i+1] in graph_clone[path[i]]:
                            graph_clone[path[i]].remove(path[i+1])
    
                result = AStar(graph_clone, spur_node, sink).search()
                spur_path = result.path
                total_path = root_path[:-1] + spur_path
                spur_cost = AStar(graph_clone, source, spur_node).search().cost
                B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
    
            if len(B) == 0:
                break
    
            B.sort(key=lambda p: (p.cost, len(p.path)))
            best_b = B.pop(0)
            if best_b.cost != float('inf'):
                A.append(best_b.path)
        return A
    
    paths = YenKSP(graph, 'A', 'F', 4)
    print(paths)
    

    Produces:

    [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']]
    

    The main issue was that when there was no path found, your default returned a path with 0 cost. So when sorted by path cost, these paths were appearing as the best choice in B and being added to A. I changed the default path cost to float('inf'). Doing so revealed an error that could occur when you tried to remove the same edge twice from graph_clone (inside for path in A: ...), so I added an if check to conditionally remove the edge. The two last things the diff indicate that I did were (a) imitate your dotmap.Map class (you can remove this and uncomment the import), and (b) only add a path to the resultset A if the cost is finite.