"""3D navigation mesh system — navmesh generation, A* pathfinding, agents, obstacles.
Provides NavigationMesh3D for defining walkable surfaces, NavigationServer3D as a
singleton coordinator, NavigationAgent3D for autonomous path-following, and
NavigationObstacle3D for dynamic obstacle avoidance.
"""
from __future__ import annotations
import heapq
import logging
import math
import random
from itertools import pairwise
import numpy as np
from .nodes_3d.node3d import Node3D
from .descriptors import Property, Signal
from .math.types import Vec3
log = logging.getLogger(__name__)
__all__ = [
"NavigationMesh3D",
"NavigationRegion3D",
"NavigationAgent3D",
"NavigationServer3D",
"NavigationObstacle3D",
]
# ============================================================================
# NavigationMesh3D — navmesh data structure with A* pathfinding
# ============================================================================
[docs]
class NavigationMesh3D:
"""3D navigation mesh with triangle-based A* pathfinding.
Stores walkable geometry as triangles and builds an adjacency graph for
pathfinding. Supports manual polygon insertion and automated bake from
level geometry.
"""
def __init__(self):
self._vertices: list[list[float]] = [] # flat list of [x, y, z]
self._triangles: list[list[int]] = [] # each is [i0, i1, i2]
self._adjacency: dict[int, set[int]] = {} # triangle index -> neighbor triangle indices
self._obstacles: list[list[list[float]]] = [] # each obstacle is a list of [x, z] boundary points
self._blocked: set[int] = set() # triangle indices blocked by obstacles
self._dirty = True
# -- Public numpy views --
@property
def vertices(self) -> np.ndarray:
"""(N, 3) float32 array of mesh vertices."""
if not self._vertices:
return np.zeros((0, 3), dtype=np.float32)
return np.array(self._vertices, dtype=np.float32)
@property
def triangles(self) -> np.ndarray:
"""(M, 3) int32 array of triangle vertex indices."""
if not self._triangles:
return np.zeros((0, 3), dtype=np.int32)
return np.array(self._triangles, dtype=np.int32)
@property
def triangle_count(self) -> int:
return len(self._triangles)
# -- Polygon insertion --
[docs]
def add_polygon(self, vertices: list[Vec3], cell_size: float = 0.0) -> None:
"""Add a walkable polygon.
Without ``cell_size`` the polygon is fan-triangulated from the first
vertex (fast, 2 triangles for a quad). When ``cell_size`` is positive
the polygon's axis-aligned bounding box is subdivided into a regular
grid of right-triangles, keeping only cells whose centres fall inside
the polygon. The finer mesh is required for obstacle carving to work
at useful resolution.
Args:
vertices: 3+ coplanar points defining the polygon boundary.
cell_size: When > 0, grid cell size for subdivision. Typical
values: 0.5 -- 2.0 depending on obstacle density.
"""
if len(vertices) < 3:
raise ValueError("Polygon requires at least 3 vertices")
if cell_size > 0:
self._add_polygon_subdivided(vertices, cell_size)
else:
base_idx = len(self._vertices)
for v in vertices:
self._vertices.append([v.x, v.y, v.z])
for i in range(1, len(vertices) - 1):
self._triangles.append([base_idx, base_idx + i, base_idx + i + 1])
self._dirty = True
def _add_polygon_subdivided(self, vertices: list[Vec3], cell_size: float) -> None:
"""Grid-subdivide a polygon into fine triangles for obstacle carving."""
poly_xz = [[v.x, v.z] for v in vertices]
# Use average Y for the flat surface
avg_y = sum(v.y for v in vertices) / len(vertices)
# Bounding box
xs = [v.x for v in vertices]
zs = [v.z for v in vertices]
x_min, x_max = min(xs), max(xs)
z_min, z_max = min(zs), max(zs)
nx = max(1, int(math.ceil((x_max - x_min) / cell_size)))
nz = max(1, int(math.ceil((z_max - z_min) / cell_size)))
dx = (x_max - x_min) / nx
dz = (z_max - z_min) / nz
# Create grid vertices
base = len(self._vertices)
grid: dict[tuple[int, int], int] = {} # (ix, iz) -> vertex index
for iz in range(nz + 1):
for ix in range(nx + 1):
gx = x_min + ix * dx
gz = z_min + iz * dz
idx = base + len(grid)
grid[(ix, iz)] = idx
self._vertices.append([gx, avg_y, gz])
# Create triangles for cells whose centre is inside the polygon
for iz in range(nz):
for ix in range(nx):
cx = x_min + (ix + 0.5) * dx
cz = z_min + (iz + 0.5) * dz
if not _point_in_polygon_xz(cx, cz, poly_xz):
continue
i0 = grid[(ix, iz)]
i1 = grid[(ix + 1, iz)]
i2 = grid[(ix + 1, iz + 1)]
i3 = grid[(ix, iz + 1)]
self._triangles.append([i0, i1, i2])
self._triangles.append([i0, i2, i3])
[docs]
def add_triangle(self, v0: Vec3, v1: Vec3, v2: Vec3) -> None:
"""Add a single walkable triangle."""
base_idx = len(self._vertices)
self._vertices.append([v0.x, v0.y, v0.z])
self._vertices.append([v1.x, v1.y, v1.z])
self._vertices.append([v2.x, v2.y, v2.z])
self._triangles.append([base_idx, base_idx + 1, base_idx + 2])
self._dirty = True
[docs]
def add_obstacle(self, vertices: list[Vec3]) -> None:
"""Subtract an obstacle region from the walkable area.
The polygon is projected onto the XZ plane. Any navmesh triangle whose
centroid falls inside the obstacle polygon (or whose area overlaps it
significantly) is marked as blocked and excluded from pathfinding.
Args:
vertices: 3+ points defining the obstacle boundary (Y is ignored).
"""
if len(vertices) < 3:
raise ValueError("Obstacle polygon requires at least 3 vertices")
self._obstacles.append([[v.x, v.z] for v in vertices])
self._dirty = True
[docs]
def clear(self) -> None:
"""Remove all polygons, obstacles, and cached data."""
self._vertices.clear()
self._triangles.clear()
self._adjacency.clear()
self._obstacles.clear()
self._blocked.clear()
self._dirty = True
# -- Geometry baking --
[docs]
def bake_from_geometry(
self,
mesh_vertices: np.ndarray,
mesh_indices: np.ndarray,
agent_radius: float = 0.5,
agent_height: float = 2.0,
max_slope: float = 45.0,
cell_size: float = 0.3,
cell_height: float = 0.2,
) -> None:
"""Generate navmesh from level geometry using simplified Recast-style algorithm.
Steps:
1. Voxelize geometry into a height field
2. Mark walkable voxels (slope < max_slope, clearance > agent_height)
3. Build regions from connected walkable areas
4. Extract contours and triangulate
Args:
mesh_vertices: (N, 3) float32 array of source mesh vertices.
mesh_indices: (M, 3) int32 array of triangle indices.
agent_radius: Agent capsule radius for erosion.
agent_height: Minimum clearance height.
max_slope: Maximum walkable slope in degrees.
cell_size: Horizontal voxel size.
cell_height: Vertical voxel size.
"""
mesh_vertices = np.asarray(mesh_vertices, dtype=np.float32)
mesh_indices = np.asarray(mesh_indices, dtype=np.int32)
if mesh_vertices.ndim != 2 or mesh_vertices.shape[1] != 3:
raise ValueError("mesh_vertices must be (N, 3)")
if mesh_indices.ndim != 2 or mesh_indices.shape[1] != 3:
raise ValueError("mesh_indices must be (M, 3)")
# Step 1: Compute bounding box
bmin = mesh_vertices.min(axis=0)
bmax = mesh_vertices.max(axis=0)
grid_w = max(1, int(math.ceil((bmax[0] - bmin[0]) / cell_size)))
grid_d = max(1, int(math.ceil((bmax[2] - bmin[2]) / cell_size)))
# Step 2: Rasterize triangles into height field, filter by slope
cos_max = math.cos(math.radians(max_slope))
up = np.array([0.0, 1.0, 0.0], dtype=np.float32)
# height_field[x, z] = list of (y_min, y_max) spans
height_field: dict[tuple[int, int], list[float]] = {}
for tri in mesh_indices:
v0, v1, v2 = mesh_vertices[tri[0]], mesh_vertices[tri[1]], mesh_vertices[tri[2]]
# Compute triangle normal
e1 = v1 - v0
e2 = v2 - v0
normal = np.cross(e1, e2)
norm_len = np.linalg.norm(normal)
if norm_len < 1e-10:
continue
normal /= norm_len
# Check slope
if abs(np.dot(normal, up)) < cos_max:
continue
# Rasterize: project triangle onto xz grid
tri_verts = np.array([v0, v1, v2])
tmin = tri_verts.min(axis=0)
tmax = tri_verts.max(axis=0)
ix0 = max(0, int((tmin[0] - bmin[0]) / cell_size))
ix1 = min(grid_w - 1, int((tmax[0] - bmin[0]) / cell_size))
iz0 = max(0, int((tmin[2] - bmin[2]) / cell_size))
iz1 = min(grid_d - 1, int((tmax[2] - bmin[2]) / cell_size))
for ix in range(ix0, ix1 + 1):
for iz in range(iz0, iz1 + 1):
cx = bmin[0] + (ix + 0.5) * cell_size
cz = bmin[2] + (iz + 0.5) * cell_size
# Point-in-triangle test (projected to xz)
y = _interpolate_height(v0, v1, v2, cx, cz)
if y is not None:
key = (ix, iz)
if key not in height_field:
height_field[key] = []
height_field[key].append(y)
# Step 3: Build walkable cells — keep highest walkable surface per cell
walkable: dict[tuple[int, int], float] = {}
for key, heights in height_field.items():
walkable[key] = max(heights)
# Step 4: Erode by agent radius
erode_cells = max(1, int(math.ceil(agent_radius / cell_size)))
eroded: dict[tuple[int, int], float] = {}
for (ix, iz), y in walkable.items():
walkable_neighbors = True
for dx in range(-erode_cells, erode_cells + 1):
for dz in range(-erode_cells, erode_cells + 1):
if (ix + dx, iz + dz) not in walkable:
walkable_neighbors = False
break
if not walkable_neighbors:
break
if walkable_neighbors:
eroded[(ix, iz)] = y
# Step 5: Build regions via flood fill
visited: set[tuple[int, int]] = set()
regions: list[list[tuple[int, int]]] = []
for cell in eroded:
if cell in visited:
continue
region: list[tuple[int, int]] = []
stack = [cell]
while stack:
c = stack.pop()
if c in visited or c not in eroded:
continue
# Check height continuity
if region and abs(eroded[c] - eroded[region[0]]) > agent_height:
continue
visited.add(c)
region.append(c)
ix, iz = c
for nx, nz in [(ix + 1, iz), (ix - 1, iz), (ix, iz + 1), (ix, iz - 1)]:
if (nx, nz) not in visited and (nx, nz) in eroded:
stack.append((nx, nz))
if len(region) >= 2:
regions.append(region)
# Step 6: Triangulate each region
self._vertices.clear()
self._triangles.clear()
for region in regions:
self._triangulate_region(region, eroded, bmin, cell_size)
self._dirty = True
def _triangulate_region(
self,
cells: list[tuple[int, int]],
heights: dict[tuple[int, int], float],
bmin: np.ndarray,
cell_size: float,
) -> None:
"""Triangulate a connected region of walkable cells using a grid-based approach."""
cell_set = set(cells)
# Map cell -> vertex index
cell_to_idx: dict[tuple[int, int], int] = {}
base = len(self._vertices)
for ix, iz in cells:
idx = base + len(cell_to_idx)
cell_to_idx[(ix, iz)] = idx
cx = bmin[0] + (ix + 0.5) * cell_size
cz = bmin[2] + (iz + 0.5) * cell_size
self._vertices.append([cx, heights[(ix, iz)], cz])
# Create triangles from adjacent cell quads
for ix, iz in cells:
# Right and down neighbors form a quad -> 2 triangles
r = (ix + 1, iz)
d = (ix, iz + 1)
rd = (ix + 1, iz + 1)
if r in cell_set and d in cell_set and rd in cell_set:
i0 = cell_to_idx[(ix, iz)]
i1 = cell_to_idx[r]
i2 = cell_to_idx[rd]
i3 = cell_to_idx[d]
self._triangles.append([i0, i1, i2])
self._triangles.append([i0, i2, i3])
# -- Adjacency graph --
def _build_adjacency(self) -> None:
"""Build triangle adjacency from shared edges and mark obstacle-blocked triangles."""
if not self._dirty:
return
self._adjacency.clear()
self._blocked.clear()
# Merge vertices at the same position (quantized to avoid float issues)
SNAP = 1e-4
pos_to_canonical: dict[tuple[int, int, int], int] = {}
vertex_map: dict[int, int] = {} # original index -> canonical index
for i, v in enumerate(self._vertices):
key = (round(v[0] / SNAP), round(v[1] / SNAP), round(v[2] / SNAP))
if key not in pos_to_canonical:
pos_to_canonical[key] = i
vertex_map[i] = pos_to_canonical[key]
# Map edge (sorted canonical vertex pair) -> list of triangle indices
edge_to_tris: dict[tuple[int, int], list[int]] = {}
for ti, tri in enumerate(self._triangles):
self._adjacency.setdefault(ti, set())
for j in range(3):
ca = vertex_map[tri[j]]
cb = vertex_map[tri[(j + 1) % 3]]
edge = (min(ca, cb), max(ca, cb))
if edge not in edge_to_tris:
edge_to_tris[edge] = []
edge_to_tris[edge].append(ti)
for tris in edge_to_tris.values():
for i, tri_a in enumerate(tris):
for j in range(i + 1, len(tris)):
self._adjacency[tri_a].add(tris[j])
self._adjacency[tris[j]].add(tri_a)
# Mark triangles blocked by obstacles
if self._obstacles:
for ti in range(len(self._triangles)):
if self._is_triangle_blocked(ti):
self._blocked.add(ti)
self._dirty = False
def _is_triangle_blocked(self, tri_idx: int) -> bool:
"""Check if a triangle overlaps any obstacle polygon (XZ projection).
A triangle is blocked if its centroid lies inside an obstacle OR any of
its three vertices lie inside an obstacle. This catches both small
obstacles fully inside a large triangle and large obstacles covering
triangle edges.
"""
tri = self._triangles[tri_idx]
v0, v1, v2 = self._vertices[tri[0]], self._vertices[tri[1]], self._vertices[tri[2]]
cx = (v0[0] + v1[0] + v2[0]) / 3.0
cz = (v0[2] + v1[2] + v2[2]) / 3.0
points_xz = [(cx, cz), (v0[0], v0[2]), (v1[0], v1[2]), (v2[0], v2[2])]
for obstacle in self._obstacles:
for px, pz in points_xz:
if _point_in_polygon_xz(px, pz, obstacle):
return True
return False
# -- Triangle utilities --
def _triangle_centroid(self, tri_idx: int) -> Vec3:
"""Return the centroid of a triangle."""
tri = self._triangles[tri_idx]
v0, v1, v2 = self._vertices[tri[0]], self._vertices[tri[1]], self._vertices[tri[2]]
return Vec3(
(v0[0] + v1[0] + v2[0]) / 3.0,
(v0[1] + v1[1] + v2[1]) / 3.0,
(v0[2] + v1[2] + v2[2]) / 3.0,
)
def _point_in_triangle_3d(self, point: Vec3, tri_idx: int) -> bool:
"""Test if a point (projected to xz) lies within a triangle."""
tri = self._triangles[tri_idx]
v0, v1, v2 = self._vertices[tri[0]], self._vertices[tri[1]], self._vertices[tri[2]]
return _point_in_triangle_xz(point.x, point.z, v0[0], v0[2], v1[0], v1[2], v2[0], v2[2])
def _closest_point_on_triangle(self, point: Vec3, tri_idx: int) -> Vec3:
"""Find the closest point on a triangle to the given point."""
tri = self._triangles[tri_idx]
a = np.array(self._vertices[tri[0]], dtype=np.float64)
b = np.array(self._vertices[tri[1]], dtype=np.float64)
c = np.array(self._vertices[tri[2]], dtype=np.float64)
p = np.array([point.x, point.y, point.z], dtype=np.float64)
return Vec3(*_closest_point_on_triangle_np(p, a, b, c))
def _find_triangle(self, point: Vec3) -> int | None:
"""Find a non-blocked triangle containing a point (xz projection). Returns index or None."""
for i in range(len(self._triangles)):
if i not in self._blocked and self._point_in_triangle_3d(point, i):
return i
return None
# -- Pathfinding --
[docs]
def find_path(self, start: Vec3, end: Vec3) -> list[Vec3]:
"""A* pathfinding on the navmesh triangle graph.
Finds the shortest path from start to end by searching through connected
triangles. Returns a list of waypoints (triangle centroids + start/end)
forming the path, or an empty list if no path exists.
Args:
start: Start position in world space.
end: End position in world space.
Returns:
List of Vec3 waypoints, or empty list if unreachable.
"""
self._build_adjacency()
if not self._triangles:
return []
start_tri = self._find_triangle(start)
end_tri = self._find_triangle(end)
if start_tri is None or end_tri is None:
# Try snapping to closest triangle
start_tri = self._find_closest_triangle(start)
end_tri = self._find_closest_triangle(end)
if start_tri is None or end_tri is None:
return []
if start_tri == end_tri:
return [Vec3(start), Vec3(end)]
# A* search over triangle graph
open_set: list[tuple[float, int, int | None]] = [] # (f_cost, tri_idx, parent)
heapq.heappush(open_set, (0.0, start_tri, -1))
came_from: dict[int, int] = {}
g_score: dict[int, float] = {start_tri: 0.0}
end_centroid = self._triangle_centroid(end_tri)
while open_set:
f, current, _ = heapq.heappop(open_set)
if current == end_tri:
# Reconstruct path
path_tris = [current]
while current in came_from:
current = came_from[current]
path_tris.append(current)
path_tris.reverse()
# Build waypoint list: start -> centroids -> end
waypoints = [Vec3(start)]
for ti in path_tris[1:-1]:
waypoints.append(self._triangle_centroid(ti))
waypoints.append(Vec3(end))
return waypoints
if f > g_score.get(current, float("inf")) + _vec3_dist(self._triangle_centroid(current), end_centroid):
continue
for neighbor in self._adjacency.get(current, ()):
if neighbor in self._blocked:
continue
nc = self._triangle_centroid(neighbor)
cc = self._triangle_centroid(current)
tentative_g = g_score[current] + _vec3_dist(cc, nc)
if tentative_g < g_score.get(neighbor, float("inf")):
came_from[neighbor] = current
g_score[neighbor] = tentative_g
h = _vec3_dist(nc, end_centroid)
heapq.heappush(open_set, (tentative_g + h, neighbor, current))
return [] # No path found
# -- Spatial queries --
[docs]
def get_closest_point(self, point: Vec3) -> Vec3:
"""Snap a point to the nearest navmesh surface.
Args:
point: Query point in world space.
Returns:
Closest point on the navmesh surface.
"""
if not self._triangles:
return Vec3(point)
best_point = Vec3(point)
best_dist_sq = float("inf")
for i in range(len(self._triangles)):
cp = self._closest_point_on_triangle(point, i)
d = (cp - point).length_squared()
if d < best_dist_sq:
best_dist_sq = d
best_point = cp
return best_point
[docs]
def is_point_on_mesh(self, point: Vec3, tolerance: float = 0.5) -> bool:
"""Test if a point is on the walkable area.
Args:
point: Query point in world space.
tolerance: Maximum distance from navmesh surface to still count.
Returns:
True if point is within tolerance of the navmesh.
"""
if not self._triangles:
return False
cp = self.get_closest_point(point)
return (cp - point).length_squared() <= tolerance * tolerance
[docs]
def sample_position(self, center: Vec3, radius: float, max_attempts: int = 30) -> Vec3 | None:
"""Sample a random point near center that lies on the navmesh.
Args:
center: Center of the sampling sphere.
radius: Maximum distance from center.
max_attempts: Number of random samples to try.
Returns:
A random point on the navmesh within radius, or None if not found.
"""
if not self._triangles:
return None
for _ in range(max_attempts):
# Random offset in sphere
dx = random.uniform(-radius, radius)
dy = random.uniform(-radius, radius)
dz = random.uniform(-radius, radius)
candidate = center + Vec3(dx, dy, dz)
cp = self.get_closest_point(candidate)
if (cp - center).length() <= radius:
return cp
return None
def _find_closest_triangle(self, point: Vec3) -> int | None:
"""Find the non-blocked triangle whose centroid is closest to point."""
if not self._triangles:
return None
best_idx: int | None = None
best_dist_sq = float("inf")
for i in range(len(self._triangles)):
if i in self._blocked:
continue
c = self._triangle_centroid(i)
d = (c - point).length_squared()
if d < best_dist_sq:
best_dist_sq = d
best_idx = i
return best_idx
# ============================================================================
# NavigationServer3D — singleton managing all navigation regions
# ============================================================================
[docs]
class NavigationServer3D:
"""Global navigation server that manages all registered navigation regions.
Provides unified pathfinding and spatial queries across all active regions.
"""
_instance: NavigationServer3D | None = None
def __init__(self):
self._regions: list[NavigationRegion3D] = []
self._obstacles: list[NavigationObstacle3D] = []
[docs]
@classmethod
def get_singleton(cls) -> NavigationServer3D:
"""Return the global NavigationServer3D instance (created on first access)."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs]
@classmethod
def reset(cls) -> None:
"""Reset the singleton (useful for testing)."""
cls._instance = None
[docs]
def register_region(self, region: NavigationRegion3D) -> None:
"""Register a navigation region."""
if region not in self._regions:
self._regions.append(region)
[docs]
def unregister_region(self, region: NavigationRegion3D) -> None:
"""Unregister a navigation region."""
if region in self._regions:
self._regions.remove(region)
[docs]
def register_obstacle(self, obstacle: NavigationObstacle3D) -> None:
"""Register a dynamic obstacle."""
if obstacle not in self._obstacles:
self._obstacles.append(obstacle)
[docs]
def unregister_obstacle(self, obstacle: NavigationObstacle3D) -> None:
"""Unregister a dynamic obstacle."""
if obstacle in self._obstacles:
self._obstacles.remove(obstacle)
[docs]
def find_path(self, start: Vec3, end: Vec3) -> list[Vec3]:
"""Find a path from start to end across all active navigation regions.
Queries each enabled region and returns the shortest valid path found.
Args:
start: Start position in world space.
end: End position in world space.
Returns:
List of Vec3 waypoints, or empty list if unreachable.
"""
best_path: list[Vec3] = []
best_cost = float("inf")
for region in self._regions:
if not region.enabled or region.navigation_mesh is None:
continue
path = region.navigation_mesh.find_path(start, end)
if path:
cost = _path_length(path)
if cost < best_cost:
best_cost = cost
best_path = path
return best_path
[docs]
def get_closest_point(self, point: Vec3) -> Vec3:
"""Find the closest point on any active navmesh surface.
Args:
point: Query point in world space.
Returns:
Closest point on any navmesh, or the input point if no regions exist.
"""
best_point = Vec3(point)
best_dist_sq = float("inf")
for region in self._regions:
if not region.enabled or region.navigation_mesh is None:
continue
cp = region.navigation_mesh.get_closest_point(point)
d = (cp - point).length_squared()
if d < best_dist_sq:
best_dist_sq = d
best_point = cp
return best_point
@property
def obstacles(self) -> list[NavigationObstacle3D]:
"""All registered dynamic obstacles."""
return list(self._obstacles)
# ============================================================================
# NavigationRegion3D — node that holds a NavigationMesh3D
# ============================================================================
[docs]
class NavigationRegion3D(Node3D):
"""Node that holds a NavigationMesh3D and registers it with the server.
Attach a NavigationMesh3D to this node and add it to the scene tree to make
its walkable surface available for pathfinding.
"""
enabled = Property(True, hint="Whether this region participates in pathfinding")
def __init__(self, navigation_mesh: NavigationMesh3D | None = None, **kwargs):
super().__init__(**kwargs)
self.navigation_mesh: NavigationMesh3D | None = navigation_mesh
[docs]
def enter_tree(self) -> None:
NavigationServer3D.get_singleton().register_region(self)
[docs]
def exit_tree(self) -> None:
NavigationServer3D.get_singleton().unregister_region(self)
# ============================================================================
# NavigationAgent3D — 3D pathfinding agent
# ============================================================================
[docs]
class NavigationAgent3D(Node3D):
"""3D pathfinding agent that follows paths on the navigation mesh.
Computes a path to target_position and advances along it each physics frame.
Emits navigation_finished when the target is reached.
Attach as a child of a Node3D whose position you want to steer.
"""
target_desired_distance = Property(1.0, range=(0.01, 100.0), hint="Distance to target to consider reached")
path_desired_distance = Property(1.0, range=(0.01, 100.0), hint="Distance to path point before advancing")
max_speed = Property(10.0, range=(0.0, 1000.0), hint="Maximum movement speed")
avoidance_radius = Property(0.5, range=(0.0, 50.0), hint="Radius for obstacle avoidance")
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._target_position: Vec3 = Vec3()
self._path: list[Vec3] = []
self._path_index: int = 0
self._navigation_finished_flag: bool = True
self.velocity: Vec3 = Vec3()
self.navigation_finished = Signal()
@property
def target_position(self) -> Vec3:
return self._target_position
@target_position.setter
def target_position(self, value: Vec3) -> None:
self._target_position = Vec3(value)
self._recompute_path()
def _recompute_path(self) -> None:
"""Recompute path from current position to target."""
origin = self._get_origin()
server = NavigationServer3D.get_singleton()
self._path = server.find_path(origin, self._target_position)
self._path_index = 1 if len(self._path) > 1 else 0
self._navigation_finished_flag = len(self._path) == 0
def _get_origin(self) -> Vec3:
"""Get the world position of the agent."""
return Vec3(self.world_position)
[docs]
def is_navigation_finished(self) -> bool:
"""Check if the agent has reached its target or has no path."""
return self._navigation_finished_flag
[docs]
def get_next_path_position(self) -> Vec3:
"""Get the next waypoint position the agent is heading toward.
Returns:
Next waypoint, or current position if path is empty.
"""
if not self._path or self._path_index >= len(self._path):
return self._get_origin()
return self._path[self._path_index]
[docs]
def physics_process(self, dt: float) -> None:
"""Advance along the path, computing steering velocity."""
if self._navigation_finished_flag or not self._path:
self.velocity = Vec3()
return
origin = self._get_origin()
# Advance past reached waypoints
while self._path_index < len(self._path):
wp = self._path[self._path_index]
dist = (wp - origin).length()
if self._path_index == len(self._path) - 1:
# Final waypoint — use target distance
if dist <= self.target_desired_distance:
self._navigation_finished_flag = True
self.velocity = Vec3()
self.navigation_finished()
return
break
else:
if dist <= self.path_desired_distance:
self._path_index += 1
else:
break
if self._path_index >= len(self._path):
self._navigation_finished_flag = True
self.velocity = Vec3()
self.navigation_finished()
return
# Compute steering velocity toward next waypoint
target_wp = self._path[self._path_index]
direction = target_wp - origin
dist = direction.length()
if dist > 1e-6:
direction = direction.normalized()
speed = min(self.max_speed, dist / dt) if dt > 0 else self.max_speed
self.velocity = direction * speed
else:
self.velocity = Vec3()
# Simple obstacle avoidance — steer away from nearby obstacles
self._apply_avoidance(origin)
def _apply_avoidance(self, origin: Vec3) -> None:
"""Apply simple obstacle avoidance to the current velocity."""
if self.avoidance_radius <= 0:
return
server = NavigationServer3D.get_singleton()
for obstacle in server.obstacles:
if not obstacle.visible:
continue
obs_pos = obstacle.world_position
to_agent = origin - obs_pos
dist = to_agent.length()
combined_radius = self.avoidance_radius + obstacle.radius
if dist < combined_radius and dist > 1e-6:
# Push velocity away from obstacle
push_dir = to_agent.normalized()
push_strength = (combined_radius - dist) / combined_radius
self.velocity = self.velocity + push_dir * (self.max_speed * push_strength * 0.5)
# Clamp to max speed
vel_len = self.velocity.length()
if vel_len > self.max_speed:
self.velocity = self.velocity.normalized() * self.max_speed
# ============================================================================
# NavigationObstacle3D — dynamic obstacle
# ============================================================================
[docs]
class NavigationObstacle3D(Node3D):
"""Dynamic obstacle that affects navigation agent avoidance.
Place in the scene tree to create areas that agents will steer around.
Does not carve the navmesh — instead, agents detect obstacles at runtime
and adjust their velocity to avoid collisions.
"""
radius = Property(1.0, range=(0.01, 100.0), hint="Avoidance radius")
height = Property(2.0, range=(0.01, 100.0), hint="Obstacle height")
[docs]
def enter_tree(self) -> None:
NavigationServer3D.get_singleton().register_obstacle(self)
[docs]
def exit_tree(self) -> None:
NavigationServer3D.get_singleton().unregister_obstacle(self)
# ============================================================================
# Helper functions
# ============================================================================
def _vec3_dist(a: Vec3, b: Vec3) -> float:
"""Euclidean distance between two Vec3."""
return (a - b).length()
def _path_length(path: list[Vec3]) -> float:
"""Total length of a path."""
total = 0.0
for a, b in pairwise(path):
total += _vec3_dist(a, b)
return total
def _point_in_triangle_xz(
px: float,
pz: float,
ax: float,
az: float,
bx: float,
bz: float,
cx: float,
cz: float,
) -> bool:
"""Test if point (px, pz) is inside triangle (a, b, c) in the xz plane using barycentric coordinates."""
v0x, v0z = cx - ax, cz - az
v1x, v1z = bx - ax, bz - az
v2x, v2z = px - ax, pz - az
dot00 = v0x * v0x + v0z * v0z
dot01 = v0x * v1x + v0z * v1z
dot02 = v0x * v2x + v0z * v2z
dot11 = v1x * v1x + v1z * v1z
dot12 = v1x * v2x + v1z * v2z
inv_denom = dot00 * dot11 - dot01 * dot01
if abs(inv_denom) < 1e-12:
return False
inv_denom = 1.0 / inv_denom
u = (dot11 * dot02 - dot01 * dot12) * inv_denom
v = (dot00 * dot12 - dot01 * dot02) * inv_denom
return u >= -1e-6 and v >= -1e-6 and (u + v) <= 1.0 + 1e-6
def _interpolate_height(
v0: np.ndarray,
v1: np.ndarray,
v2: np.ndarray,
x: float,
z: float,
) -> float | None:
"""Interpolate the Y height at (x, z) within triangle (v0, v1, v2). Returns None if outside."""
if not _point_in_triangle_xz(x, z, v0[0], v0[2], v1[0], v1[2], v2[0], v2[2]):
return None
# Barycentric interpolation for Y
denom = (v1[2] - v2[2]) * (v0[0] - v2[0]) + (v2[0] - v1[0]) * (v0[2] - v2[2])
if abs(denom) < 1e-12:
return float((v0[1] + v1[1] + v2[1]) / 3.0)
w0 = ((v1[2] - v2[2]) * (x - v2[0]) + (v2[0] - v1[0]) * (z - v2[2])) / denom
w1 = ((v2[2] - v0[2]) * (x - v2[0]) + (v0[0] - v2[0]) * (z - v2[2])) / denom
w2 = 1.0 - w0 - w1
return float(w0 * v0[1] + w1 * v1[1] + w2 * v2[1])
def _closest_point_on_triangle_np(
p: np.ndarray,
a: np.ndarray,
b: np.ndarray,
c: np.ndarray,
) -> np.ndarray:
"""Find closest point on triangle (a, b, c) to point p. Returns numpy array."""
ab = b - a
ac = c - a
ap = p - a
d1 = np.dot(ab, ap)
d2 = np.dot(ac, ap)
if d1 <= 0.0 and d2 <= 0.0:
return a.copy()
bp = p - b
d3 = np.dot(ab, bp)
d4 = np.dot(ac, bp)
if d3 >= 0.0 and d4 <= d3:
return b.copy()
cp_ = p - c
d5 = np.dot(ab, cp_)
d6 = np.dot(ac, cp_)
if d6 >= 0.0 and d5 <= d6:
return c.copy()
vc = d1 * d4 - d3 * d2
if vc <= 0.0 and d1 >= 0.0 and d3 <= 0.0:
v = d1 / (d1 - d3)
return a + v * ab
vb = d5 * d2 - d1 * d6
if vb <= 0.0 and d2 >= 0.0 and d6 <= 0.0:
w = d2 / (d2 - d6)
return a + w * ac
va = d3 * d6 - d5 * d4
if va <= 0.0 and (d4 - d3) >= 0.0 and (d5 - d6) >= 0.0:
w = (d4 - d3) / ((d4 - d3) + (d5 - d6))
return b + w * (c - b)
denom = 1.0 / (va + vb + vc)
v = vb * denom
w = vc * denom
return a + ab * v + ac * w
def _point_in_polygon_xz(px: float, pz: float, polygon: list[list[float]]) -> bool:
"""Ray-casting point-in-polygon test on the XZ plane.
Args:
px: X coordinate of the test point.
pz: Z coordinate of the test point.
polygon: List of [x, z] vertices defining the polygon boundary.
Returns:
True if the point is inside the polygon.
"""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, zi = polygon[i]
xj, zj = polygon[j]
if ((zi > pz) != (zj > pz)) and (px < (xj - xi) * (pz - zi) / (zj - zi) + xi):
inside = not inside
j = i
return inside