Search code examples
pythonarraysalgorithmmatrixmultidimensional-array

algorithm to detect pools of 0s in a matrix


I'm writing an algorithm that detects regions of contiguous empty cells that are completely surrounded (orthogonally) by filled cells and do not extend to the edge of the grid. Let's call such regions "pools".

As you can see in the below visualization, there are three pool cells – (1, 1), (1, 3), and (2, 2) – forming the one and only pool in the grid. That is, both orthogonally and diagonally adjacent pool cells should be considered part of a single pool.

I have already implemented an algorithm that partially solves the problem: it correctly identifies pool cells via an orthogonal DFS, i.e. it never considers diagonal neighbors.

class Cell:
    x: int
    y: int
    is_filled: bool

    def __init__(self, x, y, is_filled):
        self.x = x
        self.y = y
        self.is_filled = is_filled

    def get_neighbor(self, grid, direction):
        if direction == "LEFT" and self.y > 0:
            return Cell(self.x, self.y - 1, grid[self.x][self.y - 1] == 1)
        if direction == "TOP" and self.x > 0:
            return Cell(self.x - 1, self.y, grid[self.x - 1][self.y] == 1)
        if direction == "RIGHT" and self.y < len(grid[0]) - 1:
            return Cell(self.x, self.y + 1, grid[self.x][self.y + 1] == 1)
        if direction == "BOTTOM" and self.x < len(grid) - 1:
            return Cell(self.x + 1, self.y, grid[self.x + 1][self.y] == 1)
        return None

def iterate_pool_cells(grid):
    def is_pool(cell):
        nonlocal visited
        nonlocal edge_reached
        if cell is None:  # Reached grid boundary, not a pool
            edge_reached = True
            return False
        if (cell.x, cell.y) in visited or cell.is_filled:  # Already visited or is land
            return True
        visited.add((cell.x, cell.y))  # Mark as visited
        is_enclosed = True
        is_enclosed = is_pool(cell.get_neighbor(grid, "LEFT")) and is_enclosed
        is_enclosed = is_pool(cell.get_neighbor(grid, "TOP")) and is_enclosed
        is_enclosed = is_pool(cell.get_neighbor(grid, "RIGHT")) and is_enclosed
        is_enclosed = is_pool(cell.get_neighbor(grid, "BOTTOM")) and is_enclosed
        return is_enclosed and not edge_reached

    visited = set()
    edge_reached = False
    for x in range(len(grid)):
        for y in range(len(grid[0])):
            cell = Cell(x, y, grid[x][y] == 1)
            if (x, y) not in visited and not cell.is_filled:
                edge_reached = False
                if is_pool(cell) and not edge_reached:
                    yield (x, y)

The problem lies in the second half of the algorithm, i.e. the iterator. I am aiming to only yield one cell per pool (it could be any cell in the pool, as long as it's just one). However, right now the iterator will yield all three of the cells from the example, because it thinks of them as separate, unconnected pools.

I want the three cells in the example to be viewed as belonging to the same pool, such that only one cell is yielded per pool.

Can someone please steer me in the right direction on how to solve this? Can I maybe do this after the DFS is over, by clustering diagonally adjacent pool cells?

Here's a test case involving the grid from the image:

grid = [
  [ 0, 1, 0, 1, 0 ],
  [ 1, 0, 1, 0, 1 ],
  [ 1, 1, 0, 1, 1 ],
  [ 1, 0, 1, 0, 1 ],
  [ 0, 0, 1, 0, 0 ],
  [ 1, 0, 1, 0, 1 ],
]

for pool_cell in iterate_pool_cells(grid):
    print(f"Pool starts at cell: {pool_cell}")

Solution

  • There are many ways you could approach this. Although you have it working for the first phase, I would suggest creating the graph in such a way that cells don't need access to the grid anymore, but reference each other directly. You can define two types of neighbors for a given cell: the list of neighbors with a "straight" connection, and another list of neighbors with a diagonal connection.

    The second list would then be used in the final phase where you identify pools.

    Here is an implementation of this:

    class Cell:
        def __init__(self, x, y, west, nw, north, ne):
            self.x = x
            self.y = y
            self.neighbors = list(filter(None, [west, north]))
            self.diagonals = list(filter(None, [nw, ne]))
            for neighbor in self.neighbors:
                neighbor.neighbors.append(self)
            for neighbor in self.diagonals:
                neighbor.diagonals.append(self)
    
        def __repr__(self):
            return f"({self.y},{self.x})"
            
    
    def create_cells(grid):
        leak_cells = set() # 0-cells at the boundary
        water_cells = set() # internal 0-cells
        height = len(grid)
        width = len(grid[0])
        above = [None] * (width + 2)
        for y, row in enumerate(grid):
            current = [None]
            for x, is_land in enumerate(row):
                cell = None if is_land else Cell(x, y, current[-1], *above[x:x+3])
                if cell:
                    if x in (0, width - 1) or y in (0, height - 1):
                        leak_cells.add(cell)
                    else:
                        water_cells.add(cell)
                current.append(cell)
            current.append(None)
            above = current
        return leak_cells, water_cells
    
    def apply_leaks(leak_cells, water_cells):
        # Flood the "leak" into water cells, converting them
        while leak_cells:
            new_leak_cells = set()
            for leak in leak_cells:
                for neighbor in leak.neighbors:
                    if neighbor in water_cells:
                        new_leak_cells.add(neighbor)
                        water_cells.remove(neighbor)
            leak_cells = new_leak_cells
    
    def collect_pools(water_cells):
        for cell in list(water_cells):
            if cell in water_cells:
                water_cells.remove(cell)
                pool = [cell]
                for cell in pool:
                    for neighbor in cell.neighbors + cell.diagonals:
                        if neighbor in water_cells:
                            pool.append(neighbor)
                            water_cells.remove(neighbor)
                yield pool
    
    
    grid = [
      [ 0, 1, 0, 1, 0 ],
      [ 1, 0, 1, 0, 1 ],
      [ 1, 1, 0, 1, 1 ],
      [ 1, 0, 1, 0, 1 ],
      [ 0, 0, 1, 0, 0 ],
      [ 1, 0, 1, 0, 1 ],
    ]
    leak_cells, water_cells = create_cells(grid)
    apply_leaks(leak_cells, water_cells)
    pools = list(collect_pools(water_cells))
    print(pools)
    

    But surely you could keep your code as it is, and implement the pool-search using 8-neighborhood, just like in the above code where I have:

    for neighbor in cell.neighbors + cell.diagonals