I am working with two different data structures on this search problem. The Uniform Cost Search implements a PriorityQueue
and A* Search implements a PriorityQueueWithFunction
which are both pre-defined for me:
class PriorityQueue:
def __init__(self):
self.heap = []
self.count = 0
def push(self, item, priority):
entry = (priority, self.count, item)
heapq.heappush(self.heap, entry)
self.count += 1
def pop(self):
(_, _, item) = heapq.heappop(self.heap)
return item
def isEmpty(self):
return len(self.heap) == 0
class PriorityQueueWithFunction(PriorityQueue):
# priorityFunction(item) -> priority
def __init__(self, priorityFunction):
self.priorityFunction = priorityFunction
PriorityQueue.__init__(self) # super-class call
def push(self, item):
# Adds an item to the Queue with priority from the priority function
PriorityQueue.push(self, item, self.priorityFunction(item))
So here is my UniformCostSearch
method which is optimal in my implementation of an Agent finding a goal through a maze. A SearchProblem
has three components, a state which is a tuple of int coordinates, the cost to get to the state, and the directions to get to the state from the start:
def uniformCostSearch(problem):
# An empty list to store already expanded states
closed = set()
fringe = PriorityQueue()
fringe.push((problem.getStartState(), 0, []), 0)
while not fringe.isEmpty():
node, cost, directions = fringe.pop()
if problem.isGoalState(node):
return directions
if not (node in closed):
closed.add(node)
for node, direction, step_cost in problem.getSuccessors(node):
fringe.push((node, cost + step_cost, directions + [direction]),
cost + step_cost)
if fringe.isEmpty():
return []
This is optimal and returns the total nodes expanded value as 620 using the specific layout of the maze. My problem is in my A* Search implementation:
def aStarSearch(problem, heuristic):
closed = set()
totalCost = 0 # Instantiate a totalCost counter
# A* uses the total cost up to current node + heuristic to goal to decide priority
fringe = PriorityQueueWithFunction(lambda x: totalCost +
heuristic(problem.getStartState(), problem)
fringe.push((problem.getStartState(), 0, []))
while not fringe.isEmpty():
node, cost, directions = fringe.pop()
if problem.isGoalState(node):
return directions
if not (node in closed):
closed.append(node)
totalCost += cost
for node, direction, cost in problem.getSuccessors(node):
fringe.push((node, cost, directions + [direction]))
if fringe.isEmpty():
return []
Both A* Search and UniformCostSearch work and find a solution, however I am getting the same search nodes expanded value, which is my question. Why is A* returning 620 if UCS is returning 620 as well? (The target node expantion for A* in this scenario is 549)
I think you're handling the costs incorrectly for both of your searches.
For uniformCostSearch
, you're only specifying the cost of the last step for each node (the cost
returned by getSuccessors
). Since that is constant, your priority queue is just a regular queue and the whole thing is a breadth-first search. Now, because the priority queue prefers older values (with lower count
s), that's actually not actually any different than what you'd get if you actually passed in a real cost value (e.g. the old cost plus the new step's cost), but you probably should do it correctly in the first place:
def uniformCostSearch(problem):
# An empty list to store already expanded states
closed = []
fringe = PriorityQueue()
fringe.push((problem.getStartState(), 0, []), 0)
while not fringe.isEmpty():
node, cost, directions = fringe.pop()
if problem.isGoalState(node):
return directions
if not (node in closed):
closed.append(node)
for node, direction, step_cost in problem.getSuccessors(node):
fringe.push((node, cost + step_cost, directions + [direction]),
cost + step_cost)
if fringe.isEmpty():
return []
Your A* search is even more mixed up about costs. The cost function ignores its input and always passes the same node to the heuristic function. The result of adding every cost value to total_cost
is that each node gets a successively higher cost when it's added to the queue. That makes the nodes get expanded the same as the uniform cost search, FIFO.
You need to make the cost function examine its argument's cost, and use the argument's node as an argument to the heuristic function. Try something like this:
def aStarSearch(problem, heuristic):
closed = []
# A* uses the total cost up to current node + heuristic to goal to decide priority
def cost_func(tup):
node, cost_so_far, directions = tup # unpack argument tuple
return cost_so_far + heuristic(node, problem) # I'm guessing at heuristic's API
fringe = PriorityQueueWithFunction(cost_func)
fringe.push((problem.getStartState(), 0, []))
while not fringe.isEmpty():
node, cost, directions = fringe.pop()
if problem.isGoalState(node):
return directions
if not (node in closed):
closed.append(node)
for node, direction, step_cost in problem.getSuccessors(node):
fringe.push((node, cost + step_cost, directions + [direction]))
if fringe.isEmpty():
return []
One final suggestion is to use a set
in place of your closed
list. A set
will have be much faster at membership testing with is
than a list (constant time, rather than O(N)
), and you can add
new values to them in (amortized) constant time.