Source code for simvx.core.navigation

"""2D Navigation and Pathfinding for SimVX.

Provides graph-based and grid-based A* pathfinding, plus a NavigationAgent2D
node that follows computed paths with steering and emits signals on arrival.

Usage:
    from simvx.core import AStar2D, AStarGrid2D, NavigationAgent2D

    # Graph-based pathfinding
    astar = AStar2D()
    astar.add_point(0, (0, 0))
    astar.add_point(1, (10, 0))
    astar.connect_points(0, 1)
    path = astar.get_point_path(0, 1)  # [(0,0), (10,0)]

    # Grid-based pathfinding
    grid = AStarGrid2D(20, 20, cell_size=32.0)
    grid.set_solid(5, 5)
    path = grid.find_path((0, 0), (10, 10))

    # Navigation agent node
    agent = NavigationAgent2D()
    agent.set_navigation(astar)
    agent.target_position = (10, 0)
"""


from __future__ import annotations

import heapq
import logging
import math
from typing import TYPE_CHECKING

from .nodes_2d.node2d import Node2D
from .descriptors import Property, Signal
from .math.types import Vec2

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    pass

__all__ = ["AStar2D", "AStarGrid2D", "NavigationAgent2D"]


[docs] class AStar2D: """Graph-based A* pathfinding. Supports both grid-based and arbitrary graphs.""" __slots__ = ("_points", "_connections", "_weights", "_disabled") def __init__(self): self._points: dict[int, tuple[float, float]] = {} self._connections: dict[int, set[int]] = {} self._weights: dict[int, float] = {} self._disabled: set[int] = set()
[docs] def add_point(self, id: int, position: tuple[float, float], weight: float = 1.0): """Add a point to the graph. Weight scales traversal cost (default 1.0).""" self._points[id] = (float(position[0]), float(position[1])) self._weights[id] = weight if id not in self._connections: self._connections[id] = set()
[docs] def remove_point(self, id: int): """Remove a point and all its connections.""" if id not in self._points: return del self._points[id] for neighbor in self._connections.pop(id, set()): self._connections.get(neighbor, set()).discard(id) self._weights.pop(id, None) self._disabled.discard(id)
[docs] def connect_points(self, id1: int, id2: int, bidirectional: bool = True): """Connect two points. Both must already exist.""" self._connections.setdefault(id1, set()).add(id2) if bidirectional: self._connections.setdefault(id2, set()).add(id1)
[docs] def disconnect_points(self, id1: int, id2: int, bidirectional: bool = True): """Remove connection between two points.""" self._connections.get(id1, set()).discard(id2) if bidirectional: self._connections.get(id2, set()).discard(id1)
[docs] def set_point_disabled(self, id: int, disabled: bool = True): """Disable/enable a point. Disabled points are excluded from pathfinding.""" if disabled: self._disabled.add(id) else: self._disabled.discard(id)
[docs] def is_point_disabled(self, id: int) -> bool: return id in self._disabled
[docs] def has_point(self, id: int) -> bool: return id in self._points
[docs] def get_point_position(self, id: int) -> tuple[float, float]: return self._points[id]
[docs] def get_point_connections(self, id: int) -> set[int]: return self._connections.get(id, set()).copy()
[docs] def get_point_count(self) -> int: return len(self._points)
[docs] def get_closest_point(self, position: tuple[float, float]) -> int: """Return id of the closest non-disabled point to the given position.""" best_id, best_dist = -1, float("inf") px, py = float(position[0]), float(position[1]) for pid, (x, y) in self._points.items(): if pid in self._disabled: continue d = (x - px) ** 2 + (y - py) ** 2 if d < best_dist: best_dist = d best_id = pid return best_id
[docs] def get_id_path(self, from_id: int, to_id: int) -> list[int]: """A* search returning list of point IDs from start to end. Empty list if no path.""" if from_id not in self._points or to_id not in self._points: return [] if from_id == to_id: return [from_id] open_set: list[tuple[float, int, int]] = [(0.0, 0, from_id)] # (f, tiebreaker, id) came_from: dict[int, int] = {} g_score: dict[int, float] = {from_id: 0.0} counter = 1 # Tiebreaker for heap stability tx, ty = self._points[to_id] while open_set: _, _, current = heapq.heappop(open_set) if current == to_id: path = [current] while current in came_from: current = came_from[current] path.append(current) path.reverse() return path # Skip if we already found a better route to this node cx, cy = self._points[current] for neighbor in self._connections.get(current, set()): if neighbor in self._disabled: continue nx, ny = self._points[neighbor] edge_cost = math.hypot(nx - cx, ny - cy) * self._weights.get(neighbor, 1.0) tentative_g = g_score[current] + edge_cost if tentative_g < g_score.get(neighbor, float("inf")): came_from[neighbor] = current g_score[neighbor] = tentative_g h = math.hypot(nx - tx, ny - ty) heapq.heappush(open_set, (tentative_g + h, counter, neighbor)) counter += 1 return []
[docs] def get_point_path(self, from_id: int, to_id: int) -> list[tuple[float, float]]: """A* search returning list of world positions.""" return [self._points[pid] for pid in self.get_id_path(from_id, to_id)]
[docs] class AStarGrid2D: """Grid-based A* with automatic cell management. Args: width: Grid width in cells. height: Grid height in cells. cell_size: Size of each cell in world units. diagonal: Allow diagonal movement (default True). offset: World-space offset of the grid origin. """ __slots__ = ("_width", "_height", "_cell_size", "_diagonal", "_offset", "_solid", "_weights") def __init__( self, width: int, height: int, cell_size: float = 1.0, diagonal: bool = True, offset: tuple[float, float] = (0.0, 0.0), ): self._width = width self._height = height self._cell_size = cell_size self._diagonal = diagonal self._offset = offset self._solid: set[tuple[int, int]] = set() self._weights: dict[tuple[int, int], float] = {} @property def width(self) -> int: return self._width @property def height(self) -> int: return self._height @property def cell_size(self) -> float: return self._cell_size
[docs] def set_solid(self, x: int, y: int, solid: bool = True): """Mark a cell as solid (impassable) or clear it.""" if solid: self._solid.add((x, y)) else: self._solid.discard((x, y))
[docs] def is_solid(self, x: int, y: int) -> bool: return (x, y) in self._solid
[docs] def set_weight(self, x: int, y: int, weight: float): """Set traversal weight for a cell. Default is 1.0; higher = costlier.""" self._weights[(x, y)] = weight
def _is_valid(self, x: int, y: int) -> bool: return 0 <= x < self._width and 0 <= y < self._height and (x, y) not in self._solid def _neighbors(self, x: int, y: int) -> list[tuple[int, int]]: result = [] for dx, dy in ((0, 1), (0, -1), (1, 0), (-1, 0)): nx, ny = x + dx, y + dy if self._is_valid(nx, ny): result.append((nx, ny)) if self._diagonal: for dx, dy in ((1, 1), (1, -1), (-1, 1), (-1, -1)): nx, ny = x + dx, y + dy if self._is_valid(nx, ny): result.append((nx, ny)) return result
[docs] def find_path(self, from_cell: tuple[int, int], to_cell: tuple[int, int]) -> list[tuple[int, int]]: """Find shortest path between two grid cells. Returns list of (x,y) cells.""" fx, fy = from_cell tx, ty = to_cell if not self._is_valid(fx, fy) or not self._is_valid(tx, ty): return [] if from_cell == to_cell: return [from_cell] open_set: list[tuple[float, int, tuple[int, int]]] = [(0.0, 0, from_cell)] came_from: dict[tuple[int, int], tuple[int, int]] = {} g_score: dict[tuple[int, int], float] = {from_cell: 0.0} counter = 1 while open_set: _, _, current = heapq.heappop(open_set) if current == to_cell: path = [current] while current in came_from: current = came_from[current] path.append(current) path.reverse() return path cx, cy = current for nx, ny in self._neighbors(cx, cy): neighbor = (nx, ny) dx, dy = abs(nx - cx), abs(ny - cy) base_cost = math.sqrt(2.0) if (dx + dy == 2) else 1.0 w = self._weights.get(neighbor, 1.0) tentative_g = g_score[current] + base_cost * w if tentative_g < g_score.get(neighbor, float("inf")): came_from[neighbor] = current g_score[neighbor] = tentative_g h = math.hypot(nx - tx, ny - ty) heapq.heappush(open_set, (tentative_g + h, counter, neighbor)) counter += 1 return []
[docs] def find_path_world(self, from_pos: tuple[float, float], to_pos: tuple[float, float]) -> list[tuple[float, float]]: """Find path using world positions (auto-converts to/from grid coords).""" from_cell = self._world_to_cell(from_pos) to_cell = self._world_to_cell(to_pos) return [self._cell_to_world(c) for c in self.find_path(from_cell, to_cell)]
def _world_to_cell(self, pos: tuple[float, float]) -> tuple[int, int]: ox, oy = self._offset return (int((pos[0] - ox) / self._cell_size), int((pos[1] - oy) / self._cell_size)) def _cell_to_world(self, cell: tuple[int, int]) -> tuple[float, float]: ox, oy = self._offset cx = cell[0] * self._cell_size + ox + self._cell_size / 2 cy = cell[1] * self._cell_size + oy + self._cell_size / 2 return (cx, cy)