"""PhysicsServer: singleton world manager and internal body record.
Private leaf module: import via the ``simvx.core.physics`` facade.
"""
import logging
import math
from dataclasses import dataclass, field
from typing import Any
import numpy as np
from ..collision import (
BoxShape,
CollisionShape,
RayHit,
SpatialHashGrid,
SphereShape,
_aabb_overlap,
_gjk_intersect,
)
from ..math.types import Vec2, Vec3
from ..nodes_2d.node2d import Node2D
from ..nodes_3d.node3d import Node3D
from ._material import BodyMode, Contact, PhysicsMaterial, _quat_to_matrix
log = logging.getLogger(__name__)
__all__ = [
"PhysicsServer",
]
# ============================================================================
# Internal body record for PhysicsServer
# ============================================================================
@dataclass
class _PhysicsBody:
"""Internal bookkeeping for a registered physics body."""
node: Any # RigidBody2D or RigidBody3D
shape: CollisionShape | None = None
position: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float32))
rotation_matrix: np.ndarray = field(default_factory=lambda: np.eye(3, dtype=np.float32))
aabb_min: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float32))
aabb_max: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float32))
def update_aabb(self):
if not self.shape:
return
he = getattr(self.shape, 'half_extents', None)
if he is not None and not np.allclose(self.rotation_matrix, np.eye(3)):
# Rotated box: compute world-space AABB from oriented extents
abs_rot = np.abs(self.rotation_matrix)
world_he = abs_rot @ np.asarray(he, dtype=np.float32)
self.aabb_min = self.position - world_he
self.aabb_max = self.position + world_he
else:
self.aabb_min, self.aabb_max = self.shape.get_aabb(self.position)
# ============================================================================
# PhysicsServer: Singleton world manager
# ============================================================================
[docs]
class PhysicsServer:
"""Central physics simulation manager.
Manages all physics bodies, runs broadphase/narrowphase collision detection,
resolves contacts with impulse-based response, and provides raycasting.
Usage:
server = PhysicsServer.get()
server.gravity = Vec3(0, -9.8, 0)
"""
_instance: PhysicsServer | None = None
def __init__(self, cell_size: float = 2.0):
self._bodies: dict[int, _PhysicsBody] = {}
self._joints: list = [] # Registered joint nodes
self._gravity = np.array([0.0, -9.8, 0.0], dtype=np.float32)
self._contacts: list[Contact] = []
self._prev_overlaps: set[tuple[int, int]] = set()
self._spatial_hash = SpatialHashGrid(cell_size=cell_size)
# Solver parameters
self._solver_iterations = 8
self._joint_iterations = 20 # Dedicated joint constraint iterations
self._position_correction_factor = 0.4 # Baumgarte stabilization
self._slop = 0.005 # Allowed penetration before correction
[docs]
@classmethod
def get(cls) -> PhysicsServer:
"""Return the singleton PhysicsServer, creating it if needed."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs]
@classmethod
def reset(cls) -> None:
"""Reset the singleton (useful for tests)."""
cls._instance = None
# ---- Configuration ----
@property
def gravity(self) -> np.ndarray:
"""Global gravity vector. Default ``(0, -9.8, 0)``.
Accepts a Vec3, tuple, or ndarray when set.
"""
return self._gravity.copy()
[docs]
@gravity.setter
def gravity(self, value) -> None:
if isinstance(value, Vec3):
self._gravity = np.array([value.x, value.y, value.z], dtype=np.float32)
else:
self._gravity = np.asarray(value, dtype=np.float32)
# ---- Body management ----
[docs]
def add_body(self, body) -> None:
"""Register a physics body node with the server."""
bid = id(body)
if bid in self._bodies:
return
pb = _PhysicsBody(node=body)
self._sync_body(pb)
self._bodies[bid] = pb
[docs]
def remove_body(self, body) -> None:
"""Unregister a physics body from the server."""
bid = id(body)
if bid in self._bodies:
del self._bodies[bid]
# Clean up overlap tracking
self._prev_overlaps = {p for p in self._prev_overlaps if bid not in p}
[docs]
def add_joint(self, joint) -> None:
"""Register a joint constraint with the server for solving during step()."""
if joint not in self._joints:
self._joints.append(joint)
[docs]
def remove_joint(self, joint) -> None:
"""Unregister a joint constraint from the server."""
try:
self._joints.remove(joint)
except ValueError:
pass
def _sync_body(self, pb: _PhysicsBody) -> None:
"""Synchronize internal body state from the node's current transform and shape."""
node = pb.node
pos = self._get_position(node)
pb.position = pos
pb.shape = self._get_shape(node)
# Sync rotation for oriented box collision
if isinstance(node, Node3D) and hasattr(node, 'rotation') and hasattr(node.rotation, 'x'):
pb.rotation_matrix = _quat_to_matrix(node.rotation)
pb.update_aabb()
def _get_position(self, node) -> np.ndarray:
"""Extract 3D position from a 2D or 3D node."""
if isinstance(node, Node3D):
p = node.position
return np.array([p.x, p.y, p.z], dtype=np.float32)
elif isinstance(node, Node2D):
p = node.position
return np.array([p.x, p.y, 0.0], dtype=np.float32)
return np.zeros(3, dtype=np.float32)
def _set_position(self, node, pos: np.ndarray) -> None:
"""Write back a 3D position to a 2D or 3D node."""
if isinstance(node, Node3D):
node.position = Vec3(float(pos[0]), float(pos[1]), float(pos[2]))
elif isinstance(node, Node2D):
node.position = Vec2(float(pos[0]), float(pos[1]))
def _get_shape(self, node) -> CollisionShape | None:
"""Find the first collision shape attached to a body node."""
shape = getattr(node, "_physics_shape", None)
if shape is not None:
return shape
return None
def _get_velocity(self, node) -> np.ndarray:
"""Extract linear velocity as a 3D numpy array."""
v = node.linear_velocity
if isinstance(v, Vec3):
return np.array([v.x, v.y, v.z], dtype=np.float32)
elif isinstance(v, Vec2):
return np.array([v.x, v.y, 0.0], dtype=np.float32)
elif isinstance(v, tuple | list):
if len(v) == 2:
return np.array([v[0], v[1], 0.0], dtype=np.float32)
return np.array(v[:3], dtype=np.float32)
return np.asarray(v, dtype=np.float32).flatten()[:3]
def _set_velocity(self, node, vel: np.ndarray) -> None:
"""Write back linear velocity to the node."""
if isinstance(node, Node3D):
node.linear_velocity = Vec3(float(vel[0]), float(vel[1]), float(vel[2]))
elif isinstance(node, Node2D):
node.linear_velocity = Vec2(float(vel[0]), float(vel[1]))
def _get_angular_velocity(self, node) -> np.ndarray:
"""Extract angular velocity as a 3D numpy array."""
v = node.angular_velocity
if isinstance(v, Vec3):
return np.array([v.x, v.y, v.z], dtype=np.float32)
elif isinstance(v, int | float):
return np.array([0.0, 0.0, float(v)], dtype=np.float32)
elif isinstance(v, Vec2):
return np.array([0.0, 0.0, v.x], dtype=np.float32)
return np.asarray(v, dtype=np.float32).flatten()[:3]
def _set_angular_velocity(self, node, av: np.ndarray) -> None:
"""Write back angular velocity to the node."""
if isinstance(node, Node3D):
node.angular_velocity = Vec3(float(av[0]), float(av[1]), float(av[2]))
elif isinstance(node, Node2D):
node.angular_velocity = float(av[2])
# ---- Simulation step ----
[docs]
def step(self, dt: float) -> None:
"""Advance the physics simulation by dt seconds.
This performs:
1. Sync positions from nodes
2. Apply gravity and forces → integrate velocities
3. Broadphase + narrowphase collision detection
4. Iterative impulse-based collision response
5. Integrate positions
6. Write results back to nodes
7. Emit body_entered/body_exited signals
"""
if dt <= 0 or not self._bodies:
return
bodies = list(self._bodies.values())
# 1. Sync positions from nodes
for pb in bodies:
self._sync_body(pb)
# 2. Apply gravity and integrate velocities for dynamic bodies
for pb in bodies:
node = pb.node
if node.mode != BodyMode.DYNAMIC:
continue
vel = self._get_velocity(node)
av = self._get_angular_velocity(node)
inv_mass = 1.0 / node.mass if node.mass > 0 else 0.0
# Gravity: flip Y for 2D nodes (screen space Y-down vs world Y-up)
gravity = self._gravity.copy()
if isinstance(node, Node2D):
gravity[1] = -gravity[1]
vel += gravity * node.gravity_scale * dt
# Accumulated force
force = np.asarray(node._accumulated_force, dtype=np.float32)
vel += force * inv_mass * dt
node._accumulated_force = np.zeros(3, dtype=np.float32)
# Accumulated torque
torque = np.asarray(node._accumulated_torque, dtype=np.float32)
# Simplified moment of inertia (treat as unit sphere)
inertia = node.mass * 0.4 # 2/5 * m * r^2 with r=1
if inertia > 0:
av += torque / inertia * dt
node._accumulated_torque = np.zeros(3, dtype=np.float32)
# Damping
vel *= max(0.0, 1.0 - node.linear_damp * dt)
av *= max(0.0, 1.0 - node.angular_damp * dt)
self._set_velocity(node, vel)
self._set_angular_velocity(node, av)
# 3. Collision detection
self._contacts.clear()
self._detect_collisions(bodies)
# 4. Solve collision constraints (impulse-based)
for _ in range(self._solver_iterations):
for contact in self._contacts:
self._solve_contact(contact, dt)
# 4b. Solve joint constraints (velocity-level + position correction)
for _ in range(self._joint_iterations):
for joint in self._joints:
if joint.body_a and joint.body_b:
joint._solve_constraint(dt)
# 5. Integrate positions
for pb in bodies:
node = pb.node
if node.mode == BodyMode.STATIC:
continue
if node.mode == BodyMode.KINEMATIC:
# Kinematic bodies move by their velocity but aren't affected by forces
vel = self._get_velocity(node)
pb.position += vel * dt
self._set_position(node, pb.position)
pb.update_aabb()
continue
# Dynamic bodies
vel = self._get_velocity(node)
pb.position += vel * dt
self._set_position(node, pb.position)
# Integrate rotation (simplified: axis-angle from angular velocity)
av = self._get_angular_velocity(node)
if isinstance(node, Node3D):
av_mag = float(np.linalg.norm(av))
if av_mag > 1e-8:
axis = av / av_mag
angle = av_mag * dt
node.rotation = node.rotation.rotate((float(axis[0]), float(axis[1]), float(axis[2])), angle)
elif isinstance(node, Node2D):
node.rotation += float(av[2]) * dt
pb.update_aabb()
# 5b. Post-integration joint position correction (snap bodies back to constraint manifold)
for joint in self._joints:
if joint.body_a and joint.body_b:
joint._position_correct(dt)
# 6. Position correction (push apart overlapping bodies)
for contact in self._contacts:
self._correct_position(contact)
# 7. Emit enter/exit signals
self._emit_overlap_signals()
# ---- Collision detection ----
def _detect_collisions(self, bodies: list[_PhysicsBody]) -> None:
"""Broadphase (spatial-hash) + narrowphase collision detection.
Each step: clear the grid, insert every body's AABB keyed by its list
index, then for each body query neighbouring cells and run narrowphase
on candidate pairs where ``j > i`` (dedupes pairs: each processed once).
"""
grid = self._spatial_hash
grid.clear()
for i, pb in enumerate(bodies):
if pb.shape is not None:
grid.insert(i, pb.aabb_min, pb.aabb_max)
for i, a in enumerate(bodies):
if a.shape is None:
continue
for j in grid.query(a.aabb_min, a.aabb_max):
if j <= i:
continue # skip self and already-processed pairs
b = bodies[j]
# Skip static-static pairs
if a.node.mode == BodyMode.STATIC and b.node.mode == BodyMode.STATIC:
continue
# Grid cells are loose; still need the precise AABB test
if not _aabb_overlap(a.aabb_min, a.aabb_max, b.aabb_min, b.aabb_max):
continue
contact = self._narrow_phase(a, b)
if contact is not None:
self._contacts.append(contact)
def _narrow_phase(self, a: _PhysicsBody, b: _PhysicsBody) -> Contact | None:
"""Compute contact data for two overlapping bodies.
Uses analytical solutions for sphere-sphere, sphere-box, and box-box.
Falls back to GJK for generic convex shapes.
"""
sa, sb = a.shape, b.shape
# Sphere vs Sphere
if isinstance(sa, SphereShape) and isinstance(sb, SphereShape):
return self._sphere_sphere(a, b)
# Sphere vs Box (order matters)
if isinstance(sa, SphereShape) and isinstance(sb, BoxShape):
return self._sphere_box(a, b, flip=False)
if isinstance(sa, BoxShape) and isinstance(sb, SphereShape):
return self._sphere_box(b, a, flip=True)
# Box vs Box
if isinstance(sa, BoxShape) and isinstance(sb, BoxShape):
return self._box_box(a, b)
# Generic GJK fallback
if _gjk_intersect(sa, a.position, sb, b.position):
# Approximate contact: direction between centers
diff = b.position - a.position
dist = float(np.linalg.norm(diff))
if dist < 1e-10:
normal = np.array([0.0, 1.0, 0.0], dtype=np.float32)
else:
normal = diff / dist
point = (a.position + b.position) * 0.5
return Contact(body_a=a.node, body_b=b.node, normal=normal, point=point, depth=0.01)
return None
def _sphere_sphere(self, a: _PhysicsBody, b: _PhysicsBody) -> Contact | None:
"""Analytical sphere-sphere collision."""
diff = b.position - a.position
dist = float(np.linalg.norm(diff))
ra, rb = a.shape.radius, b.shape.radius
overlap = ra + rb - dist
if overlap <= 0:
return None
if dist < 1e-10:
normal = np.array([0.0, 1.0, 0.0], dtype=np.float32)
else:
normal = diff / dist
point = a.position + normal * ra
return Contact(body_a=a.node, body_b=b.node, normal=normal, point=point, depth=overlap)
def _sphere_box(self, sphere_pb: _PhysicsBody, box_pb: _PhysicsBody, flip: bool) -> Contact | None:
"""Analytical sphere-box collision with oriented box support."""
radius = sphere_pb.shape.radius
he = np.asarray(box_pb.shape.half_extents, dtype=np.float32)
rot = box_pb.rotation_matrix # 3x3 rotation matrix
rot_t = rot.T # inverse rotation (transpose of orthonormal matrix)
# Transform sphere center into box's local coordinate system
diff_world = sphere_pb.position - box_pb.position
diff_local = rot_t @ diff_world
# Closest point on axis-aligned box in local space
clamped = np.clip(diff_local, -he, he)
to_sphere_local = diff_local - clamped
dist = float(np.linalg.norm(to_sphere_local))
if dist > radius:
return None
overlap = radius - dist
if dist < 1e-10:
# Sphere center is inside box: push along smallest penetration axis
abs_diff = np.abs(diff_local)
axis = int(np.argmin(he - abs_diff))
normal_local = np.zeros(3, dtype=np.float32)
normal_local[axis] = 1.0 if diff_local[axis] >= 0 else -1.0
overlap = float(he[axis] - abs_diff[axis]) + radius
else:
normal_local = to_sphere_local / dist
# Transform normal and closest point back to world space
normal = rot @ normal_local
closest = box_pb.position + rot @ clamped
# Normal currently points from box surface toward sphere. The solver
# convention (matching sphere-sphere/box-box) expects normal from body_a
# toward body_b, so negate when sphere is body_a and keep when box is body_a.
if flip:
a_node, b_node = box_pb.node, sphere_pb.node
else:
a_node, b_node = sphere_pb.node, box_pb.node
normal = -normal
return Contact(body_a=a_node, body_b=b_node, normal=normal, point=closest, depth=overlap)
def _box_box(self, a: _PhysicsBody, b: _PhysicsBody) -> Contact | None:
"""Axis-aligned box-box collision; contact is the minimum-penetration axis."""
he_a = a.shape.half_extents
he_b = b.shape.half_extents
diff = b.position - a.position
overlap = (he_a + he_b) - np.abs(diff)
if np.any(overlap <= 0):
return None
# Minimum penetration axis
axis = int(np.argmin(overlap))
depth = float(overlap[axis])
normal = np.zeros(3, dtype=np.float32)
normal[axis] = 1.0 if diff[axis] >= 0 else -1.0
point = (a.position + b.position) * 0.5
return Contact(body_a=a.node, body_b=b.node, normal=normal, point=point, depth=depth)
# ---- Impulse solver ----
def _solve_contact(self, contact: Contact, dt: float) -> None:
"""Apply impulse-based collision response for a single contact."""
a, b = contact.body_a, contact.body_b
normal = contact.normal
inv_mass_a = (1.0 / a.mass) if a.mode == BodyMode.DYNAMIC and a.mass > 0 else 0.0
inv_mass_b = (1.0 / b.mass) if b.mode == BodyMode.DYNAMIC and b.mass > 0 else 0.0
if inv_mass_a == 0 and inv_mass_b == 0:
return # Both immovable
vel_a = self._get_velocity(a)
vel_b = self._get_velocity(b)
relative_vel = vel_b - vel_a
vel_along_normal = float(np.dot(relative_vel, normal))
# Don't resolve if velocities are separating
if vel_along_normal > 0:
return
# Restitution: use minimum of the two materials
mat_a = a.physics_material if hasattr(a, "physics_material") and a.physics_material else PhysicsMaterial()
mat_b = b.physics_material if hasattr(b, "physics_material") and b.physics_material else PhysicsMaterial()
restitution = min(mat_a.restitution, mat_b.restitution)
# Normal impulse magnitude
j = -(1.0 + restitution) * vel_along_normal
j /= inv_mass_a + inv_mass_b
impulse = normal * j
if a.mode == BodyMode.DYNAMIC:
vel_a -= impulse * inv_mass_a
self._set_velocity(a, vel_a)
if b.mode == BodyMode.DYNAMIC:
vel_b += impulse * inv_mass_b
self._set_velocity(b, vel_b)
# Friction impulse (tangential)
relative_vel = vel_b - vel_a # Recompute after normal impulse
tangent = relative_vel - normal * float(np.dot(relative_vel, normal))
t_len = float(np.linalg.norm(tangent))
if t_len < 1e-10:
return
tangent /= t_len
friction = math.sqrt(mat_a.friction * mat_b.friction) # Geometric mean
jt = -float(np.dot(relative_vel, tangent))
jt /= inv_mass_a + inv_mass_b
# Coulomb friction clamp
if abs(jt) > abs(j) * friction:
jt = abs(j) * friction * (1.0 if jt > 0 else -1.0)
friction_impulse = tangent * jt
if a.mode == BodyMode.DYNAMIC:
self._set_velocity(a, self._get_velocity(a) - friction_impulse * inv_mass_a)
if b.mode == BodyMode.DYNAMIC:
self._set_velocity(b, self._get_velocity(b) + friction_impulse * inv_mass_b)
def _correct_position(self, contact: Contact) -> None:
"""Baumgarte position correction to prevent sinking."""
a, b = contact.body_a, contact.body_b
inv_mass_a = (1.0 / a.mass) if a.mode == BodyMode.DYNAMIC and a.mass > 0 else 0.0
inv_mass_b = (1.0 / b.mass) if b.mode == BodyMode.DYNAMIC and b.mass > 0 else 0.0
total_inv = inv_mass_a + inv_mass_b
if total_inv == 0:
return
correction_mag = max(contact.depth - self._slop, 0.0) * self._position_correction_factor / total_inv
correction = contact.normal * correction_mag
if a.mode == BodyMode.DYNAMIC:
pb_a = self._bodies.get(id(a))
if pb_a:
pb_a.position -= correction * inv_mass_a
self._set_position(a, pb_a.position)
pb_a.update_aabb()
if b.mode == BodyMode.DYNAMIC:
pb_b = self._bodies.get(id(b))
if pb_b:
pb_b.position += correction * inv_mass_b
self._set_position(b, pb_b.position)
pb_b.update_aabb()
# ---- Overlap signals ----
def _emit_overlap_signals(self) -> None:
"""Emit body_entered/body_exited signals based on current contacts."""
current_overlaps: set[tuple[int, int]] = set()
for contact in self._contacts:
pair = (id(contact.body_a), id(contact.body_b))
pair = (min(pair), max(pair)) # Canonical order
current_overlaps.add(pair)
# New overlaps → body_entered
for pair in current_overlaps - self._prev_overlaps:
a_id, b_id = pair
a_pb = self._bodies.get(a_id)
b_pb = self._bodies.get(b_id)
if a_pb and b_pb:
if hasattr(a_pb.node, "body_entered"):
a_pb.node.body_entered(b_pb.node)
if hasattr(b_pb.node, "body_entered"):
b_pb.node.body_entered(a_pb.node)
# Lost overlaps → body_exited
for pair in self._prev_overlaps - current_overlaps:
a_id, b_id = pair
a_pb = self._bodies.get(a_id)
b_pb = self._bodies.get(b_id)
if a_pb and b_pb:
if hasattr(a_pb.node, "body_exited"):
a_pb.node.body_exited(b_pb.node)
if hasattr(b_pb.node, "body_exited"):
b_pb.node.body_exited(a_pb.node)
self._prev_overlaps = current_overlaps
# ---- Queries ----
[docs]
def raycast(
self,
origin,
direction,
max_dist: float = 1000.0,
layer_mask: int = 0xFFFFFFFF,
) -> list[RayHit]:
"""Cast a ray through the physics world.
Args:
origin: Ray start position (Vec3, tuple, or ndarray).
direction: Ray direction (will be normalized).
max_dist: Maximum ray length.
layer_mask: Bitmask to filter bodies by collision layer.
Returns:
List of RayHit sorted by distance.
"""
from ..collision import _ray_aabb, _ray_sphere, _surface_normal
if isinstance(origin, Vec2 | Vec3):
origin = np.array(list(origin), dtype=np.float32)
if len(origin) == 2:
origin = np.array([origin[0], origin[1], 0.0], dtype=np.float32)
else:
origin = np.asarray(origin, dtype=np.float32)
if isinstance(direction, Vec2 | Vec3):
direction = np.array(list(direction), dtype=np.float32)
if len(direction) == 2:
direction = np.array([direction[0], direction[1], 0.0], dtype=np.float32)
else:
direction = np.asarray(direction, dtype=np.float32)
d_len = float(np.linalg.norm(direction))
if d_len < 1e-10:
return []
direction = direction / d_len
hits: list[RayHit] = []
for pb in self._bodies.values():
node = pb.node
layer = getattr(node, "collision_layer", 1)
if not (layer_mask & layer):
continue
if pb.shape is None:
continue
# AABB pre-test
t_aabb = _ray_aabb(origin, direction, pb.aabb_min, pb.aabb_max, max_dist)
if t_aabb is None:
continue
# Shape-specific test
if isinstance(pb.shape, SphereShape):
t = _ray_sphere(origin, direction, pb.position, pb.shape.radius, max_dist)
elif isinstance(pb.shape, BoxShape):
bmin = pb.position - pb.shape.half_extents
bmax = pb.position + pb.shape.half_extents
t = _ray_aabb(origin, direction, bmin, bmax, max_dist)
else:
t = t_aabb
if t is not None:
hit_point = origin + direction * t
hits.append(RayHit(
body=node, point=hit_point, distance=t,
normal=_surface_normal(hit_point, pb.shape, pb.position),
))
hits.sort(key=lambda h: h.distance)
return hits
[docs]
def get_overlapping_bodies(self, body) -> list:
"""Return all bodies currently overlapping the given body."""
bid = id(body)
result = []
for contact in self._contacts:
if id(contact.body_a) == bid:
result.append(contact.body_b)
elif id(contact.body_b) == bid:
result.append(contact.body_a)
return result
[docs]
@property
def body_count(self) -> int:
return len(self._bodies)