"""2D Navigation and Pathfinding for SimVX.
Provides graph-based and grid-based A* pathfinding, plus a NavigationAgent2D
node that follows computed paths with steering and emits signals on arrival.
Usage:
from simvx.core import AStar2D, AStarGrid2D, NavigationAgent2D
# Graph-based pathfinding
astar = AStar2D()
astar.add_point(0, (0, 0))
astar.add_point(1, (10, 0))
astar.connect_points(0, 1)
path = astar.get_point_path(0, 1) # [(0,0), (10,0)]
# Grid-based pathfinding
grid = AStarGrid2D(20, 20, cell_size=32.0)
grid.set_solid(5, 5)
path = grid.find_path((0, 0), (10, 10))
# Navigation agent node
agent = NavigationAgent2D()
agent.set_navigation(astar)
agent.target_position = (10, 0)
"""
from __future__ import annotations
import heapq
import logging
import math
from typing import TYPE_CHECKING
from .nodes_2d.node2d import Node2D
from .descriptors import Property, Signal
from .math.types import Vec2
log = logging.getLogger(__name__)
if TYPE_CHECKING:
pass
__all__ = ["AStar2D", "AStarGrid2D", "NavigationAgent2D"]
[docs]
class AStar2D:
"""Graph-based A* pathfinding. Supports both grid-based and arbitrary graphs."""
__slots__ = ("_points", "_connections", "_weights", "_disabled")
def __init__(self):
self._points: dict[int, tuple[float, float]] = {}
self._connections: dict[int, set[int]] = {}
self._weights: dict[int, float] = {}
self._disabled: set[int] = set()
[docs]
def add_point(self, id: int, position: tuple[float, float], weight: float = 1.0):
"""Add a point to the graph. Weight scales traversal cost (default 1.0)."""
self._points[id] = (float(position[0]), float(position[1]))
self._weights[id] = weight
if id not in self._connections:
self._connections[id] = set()
[docs]
def remove_point(self, id: int):
"""Remove a point and all its connections."""
if id not in self._points:
return
del self._points[id]
for neighbor in self._connections.pop(id, set()):
self._connections.get(neighbor, set()).discard(id)
self._weights.pop(id, None)
self._disabled.discard(id)
[docs]
def connect_points(self, id1: int, id2: int, bidirectional: bool = True):
"""Connect two points. Both must already exist."""
self._connections.setdefault(id1, set()).add(id2)
if bidirectional:
self._connections.setdefault(id2, set()).add(id1)
[docs]
def disconnect_points(self, id1: int, id2: int, bidirectional: bool = True):
"""Remove connection between two points."""
self._connections.get(id1, set()).discard(id2)
if bidirectional:
self._connections.get(id2, set()).discard(id1)
[docs]
def set_point_disabled(self, id: int, disabled: bool = True):
"""Disable/enable a point. Disabled points are excluded from pathfinding."""
if disabled:
self._disabled.add(id)
else:
self._disabled.discard(id)
[docs]
def is_point_disabled(self, id: int) -> bool:
return id in self._disabled
[docs]
def has_point(self, id: int) -> bool:
return id in self._points
[docs]
def get_point_position(self, id: int) -> tuple[float, float]:
return self._points[id]
[docs]
def get_point_connections(self, id: int) -> set[int]:
return self._connections.get(id, set()).copy()
[docs]
def get_point_count(self) -> int:
return len(self._points)
[docs]
def get_closest_point(self, position: tuple[float, float]) -> int:
"""Return id of the closest non-disabled point to the given position."""
best_id, best_dist = -1, float("inf")
px, py = float(position[0]), float(position[1])
for pid, (x, y) in self._points.items():
if pid in self._disabled:
continue
d = (x - px) ** 2 + (y - py) ** 2
if d < best_dist:
best_dist = d
best_id = pid
return best_id
[docs]
def get_id_path(self, from_id: int, to_id: int) -> list[int]:
"""A* search returning list of point IDs from start to end. Empty list if no path."""
if from_id not in self._points or to_id not in self._points:
return []
if from_id == to_id:
return [from_id]
open_set: list[tuple[float, int, int]] = [(0.0, 0, from_id)] # (f, tiebreaker, id)
came_from: dict[int, int] = {}
g_score: dict[int, float] = {from_id: 0.0}
counter = 1 # Tiebreaker for heap stability
tx, ty = self._points[to_id]
while open_set:
_, _, current = heapq.heappop(open_set)
if current == to_id:
path = [current]
while current in came_from:
current = came_from[current]
path.append(current)
path.reverse()
return path
# Skip if we already found a better route to this node
cx, cy = self._points[current]
for neighbor in self._connections.get(current, set()):
if neighbor in self._disabled:
continue
nx, ny = self._points[neighbor]
edge_cost = math.hypot(nx - cx, ny - cy) * self._weights.get(neighbor, 1.0)
tentative_g = g_score[current] + edge_cost
if tentative_g < g_score.get(neighbor, float("inf")):
came_from[neighbor] = current
g_score[neighbor] = tentative_g
h = math.hypot(nx - tx, ny - ty)
heapq.heappush(open_set, (tentative_g + h, counter, neighbor))
counter += 1
return []
[docs]
def get_point_path(self, from_id: int, to_id: int) -> list[tuple[float, float]]:
"""A* search returning list of world positions."""
return [self._points[pid] for pid in self.get_id_path(from_id, to_id)]
[docs]
class AStarGrid2D:
"""Grid-based A* with automatic cell management.
Args:
width: Grid width in cells.
height: Grid height in cells.
cell_size: Size of each cell in world units.
diagonal: Allow diagonal movement (default True).
offset: World-space offset of the grid origin.
"""
__slots__ = ("_width", "_height", "_cell_size", "_diagonal", "_offset", "_solid", "_weights")
def __init__(
self,
width: int,
height: int,
cell_size: float = 1.0,
diagonal: bool = True,
offset: tuple[float, float] = (0.0, 0.0),
):
self._width = width
self._height = height
self._cell_size = cell_size
self._diagonal = diagonal
self._offset = offset
self._solid: set[tuple[int, int]] = set()
self._weights: dict[tuple[int, int], float] = {}
@property
def width(self) -> int:
return self._width
@property
def height(self) -> int:
return self._height
@property
def cell_size(self) -> float:
return self._cell_size
[docs]
def set_solid(self, x: int, y: int, solid: bool = True):
"""Mark a cell as solid (impassable) or clear it."""
if solid:
self._solid.add((x, y))
else:
self._solid.discard((x, y))
[docs]
def is_solid(self, x: int, y: int) -> bool:
return (x, y) in self._solid
[docs]
def set_weight(self, x: int, y: int, weight: float):
"""Set traversal weight for a cell. Default is 1.0; higher = costlier."""
self._weights[(x, y)] = weight
def _is_valid(self, x: int, y: int) -> bool:
return 0 <= x < self._width and 0 <= y < self._height and (x, y) not in self._solid
def _neighbors(self, x: int, y: int) -> list[tuple[int, int]]:
result = []
for dx, dy in ((0, 1), (0, -1), (1, 0), (-1, 0)):
nx, ny = x + dx, y + dy
if self._is_valid(nx, ny):
result.append((nx, ny))
if self._diagonal:
for dx, dy in ((1, 1), (1, -1), (-1, 1), (-1, -1)):
nx, ny = x + dx, y + dy
if self._is_valid(nx, ny):
result.append((nx, ny))
return result
[docs]
def find_path(self, from_cell: tuple[int, int], to_cell: tuple[int, int]) -> list[tuple[int, int]]:
"""Find shortest path between two grid cells. Returns list of (x,y) cells."""
fx, fy = from_cell
tx, ty = to_cell
if not self._is_valid(fx, fy) or not self._is_valid(tx, ty):
return []
if from_cell == to_cell:
return [from_cell]
open_set: list[tuple[float, int, tuple[int, int]]] = [(0.0, 0, from_cell)]
came_from: dict[tuple[int, int], tuple[int, int]] = {}
g_score: dict[tuple[int, int], float] = {from_cell: 0.0}
counter = 1
while open_set:
_, _, current = heapq.heappop(open_set)
if current == to_cell:
path = [current]
while current in came_from:
current = came_from[current]
path.append(current)
path.reverse()
return path
cx, cy = current
for nx, ny in self._neighbors(cx, cy):
neighbor = (nx, ny)
dx, dy = abs(nx - cx), abs(ny - cy)
base_cost = math.sqrt(2.0) if (dx + dy == 2) else 1.0
w = self._weights.get(neighbor, 1.0)
tentative_g = g_score[current] + base_cost * w
if tentative_g < g_score.get(neighbor, float("inf")):
came_from[neighbor] = current
g_score[neighbor] = tentative_g
h = math.hypot(nx - tx, ny - ty)
heapq.heappush(open_set, (tentative_g + h, counter, neighbor))
counter += 1
return []
[docs]
def find_path_world(self, from_pos: tuple[float, float], to_pos: tuple[float, float]) -> list[tuple[float, float]]:
"""Find path using world positions (auto-converts to/from grid coords)."""
from_cell = self._world_to_cell(from_pos)
to_cell = self._world_to_cell(to_pos)
return [self._cell_to_world(c) for c in self.find_path(from_cell, to_cell)]
def _world_to_cell(self, pos: tuple[float, float]) -> tuple[int, int]:
ox, oy = self._offset
return (int((pos[0] - ox) / self._cell_size), int((pos[1] - oy) / self._cell_size))
def _cell_to_world(self, cell: tuple[int, int]) -> tuple[float, float]:
ox, oy = self._offset
cx = cell[0] * self._cell_size + ox + self._cell_size / 2
cy = cell[1] * self._cell_size + oy + self._cell_size / 2
return (cx, cy)
[docs]
class NavigationAgent2D(Node2D):
"""2D node that follows A* paths with steering.
Assign a pathfinder (AStar2D or AStarGrid2D) via ``set_navigation()``,
set ``target_position``, and the agent moves toward it each physics frame.
Emits ``navigation_finished`` when the target is reached.
Usage:
agent = NavigationAgent2D(max_speed=200.0)
agent.set_navigation(my_astar)
agent.target_position = Vec2(500, 300)
"""
max_speed = Property(100.0, range=(0, 10000), hint="Max movement speed in units/sec", group="Navigation")
path_desired_distance = Property(4.0, range=(0.1, 100), hint="Distance to waypoint before advancing",
group="Navigation")
def __init__(self, max_speed: float = 100.0, path_desired_distance: float = 4.0, **kwargs):
super().__init__(**kwargs)
self.max_speed = max_speed
self.path_desired_distance = path_desired_distance
self.navigation_finished = Signal()
self._navigation: AStar2D | AStarGrid2D | None = None
self._path: list[tuple[float, float]] = []
self._path_index: int = 0
self._target_position: Vec2 = Vec2()
self._velocity: Vec2 = Vec2()
self._navigating: bool = False
@property
def target_position(self) -> Vec2:
return self._target_position
@target_position.setter
def target_position(self, value):
self._target_position = Vec2(value)
self._recompute_path()
@property
def velocity(self) -> Vec2:
"""Current velocity (read-only, computed each physics frame)."""
return Vec2(self._velocity)
@property
def is_navigation_finished(self) -> bool:
return not self._navigating
[docs]
def set_navigation(self, nav: AStar2D | AStarGrid2D):
"""Assign a pathfinder instance."""
self._navigation = nav
def _recompute_path(self):
"""Recompute the path from current position to target."""
self._path = []
self._path_index = 0
self._velocity = Vec2()
if self._navigation is None:
return
pos = (float(self.position.x), float(self.position.y))
target = (float(self._target_position.x), float(self._target_position.y))
if isinstance(self._navigation, AStarGrid2D):
self._path = self._navigation.find_path_world(pos, target)
elif isinstance(self._navigation, AStar2D):
from_id = self._navigation.get_closest_point(pos)
to_id = self._navigation.get_closest_point(target)
if from_id >= 0 and to_id >= 0:
self._path = self._navigation.get_point_path(from_id, to_id)
if self._path:
self._path_index = 0
self._navigating = True
else:
self._navigating = False
[docs]
def physics_process(self, dt: float):
if not self._navigating or not self._path:
self._velocity = Vec2()
return
# Advance past waypoints that are close enough
while self._path_index < len(self._path):
wp = self._path[self._path_index]
dx = wp[0] - self.position.x
dy = wp[1] - self.position.y
dist = math.hypot(dx, dy)
if dist <= self.path_desired_distance and self._path_index < len(self._path) - 1:
self._path_index += 1
else:
break
# Check if we've reached the final waypoint
wp = self._path[self._path_index]
dx = wp[0] - self.position.x
dy = wp[1] - self.position.y
dist = math.hypot(dx, dy)
if dist <= self.path_desired_distance and self._path_index >= len(self._path) - 1:
# Arrived at destination
self._velocity = Vec2()
self._navigating = False
self.navigation_finished()
return
# Steer toward current waypoint
if dist > 0:
speed = min(self.max_speed, dist / dt) if dt > 0 else self.max_speed
nx, ny = dx / dist, dy / dist
self._velocity = Vec2(nx * speed, ny * speed)
self.position = Vec2(
self.position.x + self._velocity.x * dt,
self.position.y + self._velocity.y * dt,
)