Search code examples
pythonsearchnodespriority-queueheuristics

Why is my A* Search returning the same expanded space as my UniformCostSearch?


I am working with two different data structures on this search problem. The Uniform Cost Search implements a PriorityQueue and A* Search implements a PriorityQueueWithFunction which are both pre-defined for me:

class PriorityQueue:
    def __init__(self):
        self.heap = []
        self.count = 0

    def push(self, item, priority):
        entry = (priority, self.count, item)
        heapq.heappush(self.heap, entry)
        self.count += 1

    def pop(self):
        (_, _, item) = heapq.heappop(self.heap)
        return item

    def isEmpty(self):
        return len(self.heap) == 0

class PriorityQueueWithFunction(PriorityQueue):
    # priorityFunction(item) -> priority
    def __init__(self, priorityFunction):
        self.priorityFunction = priorityFunction
        PriorityQueue.__init__(self) # super-class call

    def push(self, item):
        # Adds an item to the Queue with priority from the priority function
        PriorityQueue.push(self, item, self.priorityFunction(item))

So here is my UniformCostSearch method which is optimal in my implementation of an Agent finding a goal through a maze. A SearchProblem has three components, a state which is a tuple of int coordinates, the cost to get to the state, and the directions to get to the state from the start:

def uniformCostSearch(problem):
    # An empty list to store already expanded states
    closed = set()
    fringe = PriorityQueue()
    fringe.push((problem.getStartState(), 0, []), 0)

    while not fringe.isEmpty():
        node, cost, directions = fringe.pop()

        if problem.isGoalState(node):
            return directions

        if not (node in closed):
            closed.add(node)

            for node, direction, step_cost in problem.getSuccessors(node):
                fringe.push((node, cost + step_cost, directions + [direction]), 
                                                           cost + step_cost)

    if fringe.isEmpty():
        return []

This is optimal and returns the total nodes expanded value as 620 using the specific layout of the maze. My problem is in my A* Search implementation:

def aStarSearch(problem, heuristic):
    closed = set()
    totalCost = 0 # Instantiate a totalCost counter
    # A* uses the total cost up to current node + heuristic to goal to decide priority
    fringe = PriorityQueueWithFunction(lambda x: totalCost +
                                       heuristic(problem.getStartState(), problem)
    fringe.push((problem.getStartState(), 0, []))

    while not fringe.isEmpty():
        node, cost, directions = fringe.pop()

        if problem.isGoalState(node):
            return directions

        if not (node in closed):
            closed.append(node)
            totalCost += cost

            for node, direction, cost in problem.getSuccessors(node):
                fringe.push((node, cost, directions + [direction]))

    if fringe.isEmpty():
        return []

Both A* Search and UniformCostSearch work and find a solution, however I am getting the same search nodes expanded value, which is my question. Why is A* returning 620 if UCS is returning 620 as well? (The target node expantion for A* in this scenario is 549)


Solution

  • I think you're handling the costs incorrectly for both of your searches.

    For uniformCostSearch, you're only specifying the cost of the last step for each node (the cost returned by getSuccessors). Since that is constant, your priority queue is just a regular queue and the whole thing is a breadth-first search. Now, because the priority queue prefers older values (with lower counts), that's actually not actually any different than what you'd get if you actually passed in a real cost value (e.g. the old cost plus the new step's cost), but you probably should do it correctly in the first place:

    def uniformCostSearch(problem):
        # An empty list to store already expanded states
        closed = []
        fringe = PriorityQueue()
        fringe.push((problem.getStartState(), 0, []), 0)
    
        while not fringe.isEmpty():
            node, cost, directions = fringe.pop()
    
            if problem.isGoalState(node):
                return directions
    
            if not (node in closed):
                closed.append(node)
    
                for node, direction, step_cost in problem.getSuccessors(node):
                    fringe.push((node, cost + step_cost, directions + [direction]),
                                cost + step_cost)
    
        if fringe.isEmpty():
            return []
    

    Your A* search is even more mixed up about costs. The cost function ignores its input and always passes the same node to the heuristic function. The result of adding every cost value to total_cost is that each node gets a successively higher cost when it's added to the queue. That makes the nodes get expanded the same as the uniform cost search, FIFO.

    You need to make the cost function examine its argument's cost, and use the argument's node as an argument to the heuristic function. Try something like this:

    def aStarSearch(problem, heuristic):
        closed = []
        # A* uses the total cost up to current node + heuristic to goal to decide priority
        def cost_func(tup):
            node, cost_so_far, directions = tup   # unpack argument tuple
            return cost_so_far + heuristic(node, problem) # I'm guessing at heuristic's API
    
        fringe = PriorityQueueWithFunction(cost_func)
        fringe.push((problem.getStartState(), 0, []))
    
        while not fringe.isEmpty():
            node, cost, directions = fringe.pop()
    
            if problem.isGoalState(node):
                return directions
    
            if not (node in closed):
                closed.append(node)
    
                for node, direction, step_cost in problem.getSuccessors(node):
                    fringe.push((node, cost + step_cost, directions + [direction]))
    
        if fringe.isEmpty():
            return []
    

    One final suggestion is to use a set in place of your closed list. A set will have be much faster at membership testing with is than a list (constant time, rather than O(N)), and you can add new values to them in (amortized) constant time.