Source code for simvx.core.physics._server

"""PhysicsServer: singleton world manager and internal body record.

Private leaf module: import via the ``simvx.core.physics`` facade.
"""

import logging
import math
from dataclasses import dataclass, field
from typing import Any

import numpy as np

from ..collision import (
    BoxShape,
    CollisionShape,
    RayHit,
    SpatialHashGrid,
    SphereShape,
    _aabb_overlap,
    _gjk_intersect,
)
from ..math.types import Vec2, Vec3
from ..nodes_2d.node2d import Node2D
from ..nodes_3d.node3d import Node3D
from ._material import BodyMode, Contact, PhysicsMaterial, _quat_to_matrix

log = logging.getLogger(__name__)

__all__ = [
    "PhysicsServer",
]

# ============================================================================
# Internal body record for PhysicsServer
# ============================================================================

@dataclass
class _PhysicsBody:
    """Internal bookkeeping for a registered physics body."""

    node: Any  # RigidBody2D or RigidBody3D
    shape: CollisionShape | None = None
    position: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float32))
    rotation_matrix: np.ndarray = field(default_factory=lambda: np.eye(3, dtype=np.float32))
    aabb_min: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float32))
    aabb_max: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float32))

    def update_aabb(self):
        if not self.shape:
            return
        he = getattr(self.shape, 'half_extents', None)
        if he is not None and not np.allclose(self.rotation_matrix, np.eye(3)):
            # Rotated box: compute world-space AABB from oriented extents
            abs_rot = np.abs(self.rotation_matrix)
            world_he = abs_rot @ np.asarray(he, dtype=np.float32)
            self.aabb_min = self.position - world_he
            self.aabb_max = self.position + world_he
        else:
            self.aabb_min, self.aabb_max = self.shape.get_aabb(self.position)

# ============================================================================
# PhysicsServer: Singleton world manager
# ============================================================================

[docs] class PhysicsServer: """Central physics simulation manager. Manages all physics bodies, runs broadphase/narrowphase collision detection, resolves contacts with impulse-based response, and provides raycasting. Usage: server = PhysicsServer.get() server.gravity = Vec3(0, -9.8, 0) """ _instance: PhysicsServer | None = None def __init__(self, cell_size: float = 2.0): self._bodies: dict[int, _PhysicsBody] = {} self._joints: list = [] # Registered joint nodes self._gravity = np.array([0.0, -9.8, 0.0], dtype=np.float32) self._contacts: list[Contact] = [] self._prev_overlaps: set[tuple[int, int]] = set() self._spatial_hash = SpatialHashGrid(cell_size=cell_size) # Solver parameters self._solver_iterations = 8 self._joint_iterations = 20 # Dedicated joint constraint iterations self._position_correction_factor = 0.4 # Baumgarte stabilization self._slop = 0.005 # Allowed penetration before correction
[docs] @classmethod def get(cls) -> PhysicsServer: """Return the singleton PhysicsServer, creating it if needed.""" if cls._instance is None: cls._instance = cls() return cls._instance
[docs] @classmethod def reset(cls) -> None: """Reset the singleton (useful for tests).""" cls._instance = None
# ---- Configuration ---- @property def gravity(self) -> np.ndarray: """Global gravity vector. Default ``(0, -9.8, 0)``. Accepts a Vec3, tuple, or ndarray when set. """ return self._gravity.copy()
[docs] @gravity.setter def gravity(self, value) -> None: if isinstance(value, Vec3): self._gravity = np.array([value.x, value.y, value.z], dtype=np.float32) else: self._gravity = np.asarray(value, dtype=np.float32)
# ---- Body management ----
[docs] def add_body(self, body) -> None: """Register a physics body node with the server.""" bid = id(body) if bid in self._bodies: return pb = _PhysicsBody(node=body) self._sync_body(pb) self._bodies[bid] = pb
[docs] def remove_body(self, body) -> None: """Unregister a physics body from the server.""" bid = id(body) if bid in self._bodies: del self._bodies[bid] # Clean up overlap tracking self._prev_overlaps = {p for p in self._prev_overlaps if bid not in p}
[docs] def add_joint(self, joint) -> None: """Register a joint constraint with the server for solving during step().""" if joint not in self._joints: self._joints.append(joint)
[docs] def remove_joint(self, joint) -> None: """Unregister a joint constraint from the server.""" try: self._joints.remove(joint) except ValueError: pass
def _sync_body(self, pb: _PhysicsBody) -> None: """Synchronize internal body state from the node's current transform and shape.""" node = pb.node pos = self._get_position(node) pb.position = pos pb.shape = self._get_shape(node) # Sync rotation for oriented box collision if isinstance(node, Node3D) and hasattr(node, 'rotation') and hasattr(node.rotation, 'x'): pb.rotation_matrix = _quat_to_matrix(node.rotation) pb.update_aabb() def _get_position(self, node) -> np.ndarray: """Extract 3D position from a 2D or 3D node.""" if isinstance(node, Node3D): p = node.position return np.array([p.x, p.y, p.z], dtype=np.float32) elif isinstance(node, Node2D): p = node.position return np.array([p.x, p.y, 0.0], dtype=np.float32) return np.zeros(3, dtype=np.float32) def _set_position(self, node, pos: np.ndarray) -> None: """Write back a 3D position to a 2D or 3D node.""" if isinstance(node, Node3D): node.position = Vec3(float(pos[0]), float(pos[1]), float(pos[2])) elif isinstance(node, Node2D): node.position = Vec2(float(pos[0]), float(pos[1])) def _get_shape(self, node) -> CollisionShape | None: """Find the first collision shape attached to a body node.""" shape = getattr(node, "_physics_shape", None) if shape is not None: return shape return None def _get_velocity(self, node) -> np.ndarray: """Extract linear velocity as a 3D numpy array.""" v = node.linear_velocity if isinstance(v, Vec3): return np.array([v.x, v.y, v.z], dtype=np.float32) elif isinstance(v, Vec2): return np.array([v.x, v.y, 0.0], dtype=np.float32) elif isinstance(v, tuple | list): if len(v) == 2: return np.array([v[0], v[1], 0.0], dtype=np.float32) return np.array(v[:3], dtype=np.float32) return np.asarray(v, dtype=np.float32).flatten()[:3] def _set_velocity(self, node, vel: np.ndarray) -> None: """Write back linear velocity to the node.""" if isinstance(node, Node3D): node.linear_velocity = Vec3(float(vel[0]), float(vel[1]), float(vel[2])) elif isinstance(node, Node2D): node.linear_velocity = Vec2(float(vel[0]), float(vel[1])) def _get_angular_velocity(self, node) -> np.ndarray: """Extract angular velocity as a 3D numpy array.""" v = node.angular_velocity if isinstance(v, Vec3): return np.array([v.x, v.y, v.z], dtype=np.float32) elif isinstance(v, int | float): return np.array([0.0, 0.0, float(v)], dtype=np.float32) elif isinstance(v, Vec2): return np.array([0.0, 0.0, v.x], dtype=np.float32) return np.asarray(v, dtype=np.float32).flatten()[:3] def _set_angular_velocity(self, node, av: np.ndarray) -> None: """Write back angular velocity to the node.""" if isinstance(node, Node3D): node.angular_velocity = Vec3(float(av[0]), float(av[1]), float(av[2])) elif isinstance(node, Node2D): node.angular_velocity = float(av[2]) # ---- Simulation step ----
[docs] def step(self, dt: float) -> None: """Advance the physics simulation by dt seconds. This performs: 1. Sync positions from nodes 2. Apply gravity and forces → integrate velocities 3. Broadphase + narrowphase collision detection 4. Iterative impulse-based collision response 5. Integrate positions 6. Write results back to nodes 7. Emit body_entered/body_exited signals """ if dt <= 0 or not self._bodies: return bodies = list(self._bodies.values()) # 1. Sync positions from nodes for pb in bodies: self._sync_body(pb) # 2. Apply gravity and integrate velocities for dynamic bodies for pb in bodies: node = pb.node if node.mode != BodyMode.DYNAMIC: continue vel = self._get_velocity(node) av = self._get_angular_velocity(node) inv_mass = 1.0 / node.mass if node.mass > 0 else 0.0 # Gravity: flip Y for 2D nodes (screen space Y-down vs world Y-up) gravity = self._gravity.copy() if isinstance(node, Node2D): gravity[1] = -gravity[1] vel += gravity * node.gravity_scale * dt # Accumulated force force = np.asarray(node._accumulated_force, dtype=np.float32) vel += force * inv_mass * dt node._accumulated_force = np.zeros(3, dtype=np.float32) # Accumulated torque torque = np.asarray(node._accumulated_torque, dtype=np.float32) # Simplified moment of inertia (treat as unit sphere) inertia = node.mass * 0.4 # 2/5 * m * r^2 with r=1 if inertia > 0: av += torque / inertia * dt node._accumulated_torque = np.zeros(3, dtype=np.float32) # Damping vel *= max(0.0, 1.0 - node.linear_damp * dt) av *= max(0.0, 1.0 - node.angular_damp * dt) self._set_velocity(node, vel) self._set_angular_velocity(node, av) # 3. Collision detection self._contacts.clear() self._detect_collisions(bodies) # 4. Solve collision constraints (impulse-based) for _ in range(self._solver_iterations): for contact in self._contacts: self._solve_contact(contact, dt) # 4b. Solve joint constraints (velocity-level + position correction) for _ in range(self._joint_iterations): for joint in self._joints: if joint.body_a and joint.body_b: joint._solve_constraint(dt) # 5. Integrate positions for pb in bodies: node = pb.node if node.mode == BodyMode.STATIC: continue if node.mode == BodyMode.KINEMATIC: # Kinematic bodies move by their velocity but aren't affected by forces vel = self._get_velocity(node) pb.position += vel * dt self._set_position(node, pb.position) pb.update_aabb() continue # Dynamic bodies vel = self._get_velocity(node) pb.position += vel * dt self._set_position(node, pb.position) # Integrate rotation (simplified: axis-angle from angular velocity) av = self._get_angular_velocity(node) if isinstance(node, Node3D): av_mag = float(np.linalg.norm(av)) if av_mag > 1e-8: axis = av / av_mag angle = av_mag * dt node.rotation = node.rotation.rotate((float(axis[0]), float(axis[1]), float(axis[2])), angle) elif isinstance(node, Node2D): node.rotation += float(av[2]) * dt pb.update_aabb() # 5b. Post-integration joint position correction (snap bodies back to constraint manifold) for joint in self._joints: if joint.body_a and joint.body_b: joint._position_correct(dt) # 6. Position correction (push apart overlapping bodies) for contact in self._contacts: self._correct_position(contact) # 7. Emit enter/exit signals self._emit_overlap_signals()
# ---- Collision detection ---- def _detect_collisions(self, bodies: list[_PhysicsBody]) -> None: """Broadphase (spatial-hash) + narrowphase collision detection. Each step: clear the grid, insert every body's AABB keyed by its list index, then for each body query neighbouring cells and run narrowphase on candidate pairs where ``j > i`` (dedupes pairs: each processed once). """ grid = self._spatial_hash grid.clear() for i, pb in enumerate(bodies): if pb.shape is not None: grid.insert(i, pb.aabb_min, pb.aabb_max) for i, a in enumerate(bodies): if a.shape is None: continue for j in grid.query(a.aabb_min, a.aabb_max): if j <= i: continue # skip self and already-processed pairs b = bodies[j] # Skip static-static pairs if a.node.mode == BodyMode.STATIC and b.node.mode == BodyMode.STATIC: continue # Grid cells are loose; still need the precise AABB test if not _aabb_overlap(a.aabb_min, a.aabb_max, b.aabb_min, b.aabb_max): continue contact = self._narrow_phase(a, b) if contact is not None: self._contacts.append(contact) def _narrow_phase(self, a: _PhysicsBody, b: _PhysicsBody) -> Contact | None: """Compute contact data for two overlapping bodies. Uses analytical solutions for sphere-sphere, sphere-box, and box-box. Falls back to GJK for generic convex shapes. """ sa, sb = a.shape, b.shape # Sphere vs Sphere if isinstance(sa, SphereShape) and isinstance(sb, SphereShape): return self._sphere_sphere(a, b) # Sphere vs Box (order matters) if isinstance(sa, SphereShape) and isinstance(sb, BoxShape): return self._sphere_box(a, b, flip=False) if isinstance(sa, BoxShape) and isinstance(sb, SphereShape): return self._sphere_box(b, a, flip=True) # Box vs Box if isinstance(sa, BoxShape) and isinstance(sb, BoxShape): return self._box_box(a, b) # Generic GJK fallback if _gjk_intersect(sa, a.position, sb, b.position): # Approximate contact: direction between centers diff = b.position - a.position dist = float(np.linalg.norm(diff)) if dist < 1e-10: normal = np.array([0.0, 1.0, 0.0], dtype=np.float32) else: normal = diff / dist point = (a.position + b.position) * 0.5 return Contact(body_a=a.node, body_b=b.node, normal=normal, point=point, depth=0.01) return None def _sphere_sphere(self, a: _PhysicsBody, b: _PhysicsBody) -> Contact | None: """Analytical sphere-sphere collision.""" diff = b.position - a.position dist = float(np.linalg.norm(diff)) ra, rb = a.shape.radius, b.shape.radius overlap = ra + rb - dist if overlap <= 0: return None if dist < 1e-10: normal = np.array([0.0, 1.0, 0.0], dtype=np.float32) else: normal = diff / dist point = a.position + normal * ra return Contact(body_a=a.node, body_b=b.node, normal=normal, point=point, depth=overlap) def _sphere_box(self, sphere_pb: _PhysicsBody, box_pb: _PhysicsBody, flip: bool) -> Contact | None: """Analytical sphere-box collision with oriented box support.""" radius = sphere_pb.shape.radius he = np.asarray(box_pb.shape.half_extents, dtype=np.float32) rot = box_pb.rotation_matrix # 3x3 rotation matrix rot_t = rot.T # inverse rotation (transpose of orthonormal matrix) # Transform sphere center into box's local coordinate system diff_world = sphere_pb.position - box_pb.position diff_local = rot_t @ diff_world # Closest point on axis-aligned box in local space clamped = np.clip(diff_local, -he, he) to_sphere_local = diff_local - clamped dist = float(np.linalg.norm(to_sphere_local)) if dist > radius: return None overlap = radius - dist if dist < 1e-10: # Sphere center is inside box: push along smallest penetration axis abs_diff = np.abs(diff_local) axis = int(np.argmin(he - abs_diff)) normal_local = np.zeros(3, dtype=np.float32) normal_local[axis] = 1.0 if diff_local[axis] >= 0 else -1.0 overlap = float(he[axis] - abs_diff[axis]) + radius else: normal_local = to_sphere_local / dist # Transform normal and closest point back to world space normal = rot @ normal_local closest = box_pb.position + rot @ clamped # Normal currently points from box surface toward sphere. The solver # convention (matching sphere-sphere/box-box) expects normal from body_a # toward body_b, so negate when sphere is body_a and keep when box is body_a. if flip: a_node, b_node = box_pb.node, sphere_pb.node else: a_node, b_node = sphere_pb.node, box_pb.node normal = -normal return Contact(body_a=a_node, body_b=b_node, normal=normal, point=closest, depth=overlap) def _box_box(self, a: _PhysicsBody, b: _PhysicsBody) -> Contact | None: """Axis-aligned box-box collision; contact is the minimum-penetration axis.""" he_a = a.shape.half_extents he_b = b.shape.half_extents diff = b.position - a.position overlap = (he_a + he_b) - np.abs(diff) if np.any(overlap <= 0): return None # Minimum penetration axis axis = int(np.argmin(overlap)) depth = float(overlap[axis]) normal = np.zeros(3, dtype=np.float32) normal[axis] = 1.0 if diff[axis] >= 0 else -1.0 point = (a.position + b.position) * 0.5 return Contact(body_a=a.node, body_b=b.node, normal=normal, point=point, depth=depth) # ---- Impulse solver ---- def _solve_contact(self, contact: Contact, dt: float) -> None: """Apply impulse-based collision response for a single contact.""" a, b = contact.body_a, contact.body_b normal = contact.normal inv_mass_a = (1.0 / a.mass) if a.mode == BodyMode.DYNAMIC and a.mass > 0 else 0.0 inv_mass_b = (1.0 / b.mass) if b.mode == BodyMode.DYNAMIC and b.mass > 0 else 0.0 if inv_mass_a == 0 and inv_mass_b == 0: return # Both immovable vel_a = self._get_velocity(a) vel_b = self._get_velocity(b) relative_vel = vel_b - vel_a vel_along_normal = float(np.dot(relative_vel, normal)) # Don't resolve if velocities are separating if vel_along_normal > 0: return # Restitution: use minimum of the two materials mat_a = a.physics_material if hasattr(a, "physics_material") and a.physics_material else PhysicsMaterial() mat_b = b.physics_material if hasattr(b, "physics_material") and b.physics_material else PhysicsMaterial() restitution = min(mat_a.restitution, mat_b.restitution) # Normal impulse magnitude j = -(1.0 + restitution) * vel_along_normal j /= inv_mass_a + inv_mass_b impulse = normal * j if a.mode == BodyMode.DYNAMIC: vel_a -= impulse * inv_mass_a self._set_velocity(a, vel_a) if b.mode == BodyMode.DYNAMIC: vel_b += impulse * inv_mass_b self._set_velocity(b, vel_b) # Friction impulse (tangential) relative_vel = vel_b - vel_a # Recompute after normal impulse tangent = relative_vel - normal * float(np.dot(relative_vel, normal)) t_len = float(np.linalg.norm(tangent)) if t_len < 1e-10: return tangent /= t_len friction = math.sqrt(mat_a.friction * mat_b.friction) # Geometric mean jt = -float(np.dot(relative_vel, tangent)) jt /= inv_mass_a + inv_mass_b # Coulomb friction clamp if abs(jt) > abs(j) * friction: jt = abs(j) * friction * (1.0 if jt > 0 else -1.0) friction_impulse = tangent * jt if a.mode == BodyMode.DYNAMIC: self._set_velocity(a, self._get_velocity(a) - friction_impulse * inv_mass_a) if b.mode == BodyMode.DYNAMIC: self._set_velocity(b, self._get_velocity(b) + friction_impulse * inv_mass_b) def _correct_position(self, contact: Contact) -> None: """Baumgarte position correction to prevent sinking.""" a, b = contact.body_a, contact.body_b inv_mass_a = (1.0 / a.mass) if a.mode == BodyMode.DYNAMIC and a.mass > 0 else 0.0 inv_mass_b = (1.0 / b.mass) if b.mode == BodyMode.DYNAMIC and b.mass > 0 else 0.0 total_inv = inv_mass_a + inv_mass_b if total_inv == 0: return correction_mag = max(contact.depth - self._slop, 0.0) * self._position_correction_factor / total_inv correction = contact.normal * correction_mag if a.mode == BodyMode.DYNAMIC: pb_a = self._bodies.get(id(a)) if pb_a: pb_a.position -= correction * inv_mass_a self._set_position(a, pb_a.position) pb_a.update_aabb() if b.mode == BodyMode.DYNAMIC: pb_b = self._bodies.get(id(b)) if pb_b: pb_b.position += correction * inv_mass_b self._set_position(b, pb_b.position) pb_b.update_aabb() # ---- Overlap signals ---- def _emit_overlap_signals(self) -> None: """Emit body_entered/body_exited signals based on current contacts.""" current_overlaps: set[tuple[int, int]] = set() for contact in self._contacts: pair = (id(contact.body_a), id(contact.body_b)) pair = (min(pair), max(pair)) # Canonical order current_overlaps.add(pair) # New overlaps → body_entered for pair in current_overlaps - self._prev_overlaps: a_id, b_id = pair a_pb = self._bodies.get(a_id) b_pb = self._bodies.get(b_id) if a_pb and b_pb: if hasattr(a_pb.node, "body_entered"): a_pb.node.body_entered(b_pb.node) if hasattr(b_pb.node, "body_entered"): b_pb.node.body_entered(a_pb.node) # Lost overlaps → body_exited for pair in self._prev_overlaps - current_overlaps: a_id, b_id = pair a_pb = self._bodies.get(a_id) b_pb = self._bodies.get(b_id) if a_pb and b_pb: if hasattr(a_pb.node, "body_exited"): a_pb.node.body_exited(b_pb.node) if hasattr(b_pb.node, "body_exited"): b_pb.node.body_exited(a_pb.node) self._prev_overlaps = current_overlaps # ---- Queries ----
[docs] def raycast( self, origin, direction, max_dist: float = 1000.0, layer_mask: int = 0xFFFFFFFF, ) -> list[RayHit]: """Cast a ray through the physics world. Args: origin: Ray start position (Vec3, tuple, or ndarray). direction: Ray direction (will be normalized). max_dist: Maximum ray length. layer_mask: Bitmask to filter bodies by collision layer. Returns: List of RayHit sorted by distance. """ from ..collision import _ray_aabb, _ray_sphere, _surface_normal if isinstance(origin, Vec2 | Vec3): origin = np.array(list(origin), dtype=np.float32) if len(origin) == 2: origin = np.array([origin[0], origin[1], 0.0], dtype=np.float32) else: origin = np.asarray(origin, dtype=np.float32) if isinstance(direction, Vec2 | Vec3): direction = np.array(list(direction), dtype=np.float32) if len(direction) == 2: direction = np.array([direction[0], direction[1], 0.0], dtype=np.float32) else: direction = np.asarray(direction, dtype=np.float32) d_len = float(np.linalg.norm(direction)) if d_len < 1e-10: return [] direction = direction / d_len hits: list[RayHit] = [] for pb in self._bodies.values(): node = pb.node layer = getattr(node, "collision_layer", 1) if not (layer_mask & layer): continue if pb.shape is None: continue # AABB pre-test t_aabb = _ray_aabb(origin, direction, pb.aabb_min, pb.aabb_max, max_dist) if t_aabb is None: continue # Shape-specific test if isinstance(pb.shape, SphereShape): t = _ray_sphere(origin, direction, pb.position, pb.shape.radius, max_dist) elif isinstance(pb.shape, BoxShape): bmin = pb.position - pb.shape.half_extents bmax = pb.position + pb.shape.half_extents t = _ray_aabb(origin, direction, bmin, bmax, max_dist) else: t = t_aabb if t is not None: hit_point = origin + direction * t hits.append(RayHit( body=node, point=hit_point, distance=t, normal=_surface_normal(hit_point, pb.shape, pb.position), )) hits.sort(key=lambda h: h.distance) return hits
[docs] def get_overlapping_bodies(self, body) -> list: """Return all bodies currently overlapping the given body.""" bid = id(body) result = [] for contact in self._contacts: if id(contact.body_a) == bid: result.append(contact.body_b) elif id(contact.body_b) == bid: result.append(contact.body_a) return result
[docs] @property def body_count(self) -> int: return len(self._bodies)