I am using Yen's Algorithm (Wikipedia) to find k shortest paths in a graph. In the example below, my graph is a dictionary where each node is a key, with its value being the neighbors. Map()
from dotmap
simply allows for dictionaries to be converted into an object where keys can be accessed with dot notation. I want to find the four shortest paths in descending order from A to F where every edge has equal weight. The first two are ties (A > B > D > F) and (A > E > D > F), and the next two are (A > B > C > G > F) and finally (A > B > D > C > G > F). It is possible that my implementation of Dijkstra's (called AStar despite having no heuristic) is flawed because it is returning an empty list when no path is found. How can I have my code only pick the valid paths? Currently it returns [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], [], []]
-- it should return [['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']]
which are the shortest paths.
import copy
import heapq
from dotmap import Map
from itertools import count
graph = {
'A': ['B', 'E'],
'B': ['C', 'D'],
'C': ['G'],
'D': ['C', 'F'],
'E': ['D'],
'F': [],
'G': ['F']
}
class PriorityQueue:
def __init__(self):
self.elements = []
self._counter = count()
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item,))
def get(self):
return heapq.heappop(self.elements)[1]
class AStar:
def __init__(self, graph, start, goals=[]):
self.graph = graph
self.start = start
self.frontier = PriorityQueue()
self.frontier.put(start, 0)
self.previous = {}
self.previous[start] = None
self.costs = {}
self.costs[start] = 0
self.final_path = None
self.goals = goals
self.goal = None
def search(self):
graph = self.graph
frontier = self.frontier
goals = self.goals
costs = self.costs
while not frontier.empty():
state = frontier.get()
if state in goals:
cost = self.costs[state]
self.goal = state
self.final_path = self.trace_path()
return Map({'path': self.final_path, 'cost': cost})
for next_state in graph[state]:
new_cost = costs[state] + 1
if next_state not in costs or new_cost < costs[next_state]:
costs[next_state] = new_cost
priority = new_cost
frontier.put(next_state, priority)
self.previous[next_state] = state
# No path found
return Map({'path': [], 'cost': 0})
def trace_path(self):
current = self.goal
path = []
while current != self.start:
path.append(current)
current = self.previous[current]
path.append(self.start)
path.reverse()
return path
def YenKSP(graph, source, sink, k_paths):
graph_clone = copy.deepcopy(graph)
A = [AStar(graph, source, sink).search().path]
B = []
for k in range(1, k_paths):
for i in range(len(A[-1]) - 1):
spur_node = A[-1][i]
root_path = A[-1][:i+1]
for path in A:
if len(path) > i and root_path == path[:i+1]:
graph_clone[path[i]].remove(path[i+1])
result = AStar(graph_clone, spur_node, sink).search()
spur_path = result.path
total_path = root_path[:-1] + spur_path
spur_cost = AStar(graph_clone, source, spur_node).search().cost
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
graph_clone = copy.deepcopy(graph)
if len(B) == 0:
break
B.sort(key=lambda p: (p.cost, len(p.path)))
A.append(B[0].path)
B.pop()
return A
paths = YenKSP(graph, 'A', 'F', 4)
print(paths)
import copy
import heapq
#from dotmap import Map
from itertools import count
class Map(dict):
def __getattr__(self, k):
return self[k]
def __setattr__(self, k, v):
self[k] = v
graph = {
'A': ['B', 'E'],
'B': ['C', 'D'],
'C': ['G'],
'D': ['C', 'F'],
'E': ['D'],
'F': [],
'G': ['F']
}
class PriorityQueue:
def __init__(self):
self.elements = []
self._counter = count()
def empty(self):
return len(self.elements) == 0
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item,))
def get(self):
return heapq.heappop(self.elements)[1]
class AStar:
def __init__(self, graph, start, goals=[]):
self.graph = graph
self.start = start
self.frontier = PriorityQueue()
self.frontier.put(start, 0)
self.previous = {}
self.previous[start] = None
self.costs = {}
self.costs[start] = 0
self.final_path = None
self.goals = goals
self.goal = None
def search(self):
graph = self.graph
frontier = self.frontier
goals = self.goals
costs = self.costs
while not frontier.empty():
state = frontier.get()
if state in goals:
cost = self.costs[state]
self.goal = state
self.final_path = self.trace_path()
return Map({'path': self.final_path, 'cost': cost})
for next_state in graph[state]:
new_cost = costs[state] + 1
if next_state not in costs or new_cost < costs[next_state]:
costs[next_state] = new_cost
priority = new_cost
frontier.put(next_state, priority)
self.previous[next_state] = state
# No path found
return Map({'path': [], 'cost': float('inf')})
def trace_path(self):
current = self.goal
path = []
while current != self.start:
path.append(current)
current = self.previous[current]
path.append(self.start)
path.reverse()
return path
def YenKSP(graph, source, sink, k_paths):
A = [AStar(graph, source, sink).search().path]
B = []
for _ in range(1, k_paths):
for i in range(len(A[-1]) - 1):
graph_clone = copy.deepcopy(graph)
spur_node = A[-1][i]
root_path = A[-1][:i+1]
for path in A:
if len(path) > i and root_path == path[:i+1]:
if path[i+1] in graph_clone[path[i]]:
graph_clone[path[i]].remove(path[i+1])
result = AStar(graph_clone, spur_node, sink).search()
spur_path = result.path
total_path = root_path[:-1] + spur_path
spur_cost = AStar(graph_clone, source, spur_node).search().cost
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
if len(B) == 0:
break
B.sort(key=lambda p: (p.cost, len(p.path)))
best_b = B.pop(0)
if best_b.cost != float('inf'):
A.append(best_b.path)
return A
paths = YenKSP(graph, 'A', 'F', 4)
print(paths)
Produces:
[['A', 'B', 'D', 'F'], ['A', 'E', 'D', 'F'], ['A', 'B', 'C', 'G', 'F'], ['A', 'B', 'D', 'C', 'G', 'F']]
The main issue was that when there was no path found, your default returned a path with 0 cost. So when sorted by path cost, these paths were appearing as the best choice in B
and being added to A
. I changed the default path cost to float('inf')
. Doing so revealed an error that could occur when you tried to remove the same edge twice from graph_clone
(inside for path in A: ...
), so I added an if
check to conditionally remove the edge. The two last things the diff indicate that I did were (a) imitate your dotmap.Map
class (you can remove this and uncomment the import), and (b) only add a path to the resultset A
if the cost is finite.