"""Matrix helpers, coroutine helpers, easing functions, and raycasting utilities."""
from __future__ import annotations
import logging
import math
import time
from collections.abc import Callable
from typing import TYPE_CHECKING
import numpy as np
from .descriptors import Coroutine
from .math.types import Vec3
if TYPE_CHECKING:
from .descriptors import Signal
log = logging.getLogger(__name__)
# ============================================================================
# NumPy Matrix helpers
# ============================================================================
[docs]
def mat4_from_trs(
pos: tuple[float, float, float] | np.ndarray,
rot,
scl: tuple[float, float, float] | np.ndarray,
) -> np.ndarray:
"""Build model matrix from position, rotation quaternion, and scale.
Args:
pos: Position (x, y, z)
rot: Rotation quaternion — Quat or any object with .w/.x/.y/.z
scl: Scale (x, y, z)
Returns:
4x4 model matrix as numpy array (Translate * Rotate * Scale)
"""
from .math import quat_to_mat4, scale, translate
mat = translate(pos) @ quat_to_mat4(rot) @ scale(scl)
return mat
[docs]
def batch_mat4_from_trs(
positions: np.ndarray,
rotations: np.ndarray,
scales: np.ndarray,
) -> np.ndarray:
"""Build N model matrices from arrays of positions, quaternions, and scales.
Args:
positions: (N, 3) float32 positions
rotations: (N, 4) float32 quaternions [w, x, y, z]
scales: (N, 3) float32 scale factors
Returns:
(N, 4, 4) float32 model matrices (Translate * Rotate * Scale)
"""
n = positions.shape[0]
w, x, y, z = rotations[:, 0], rotations[:, 1], rotations[:, 2], rotations[:, 3]
xx, yy, zz = x * x, y * y, z * z
xy, xz, yz = x * y, x * z, y * z
wx, wy, wz = w * x, w * y, w * z
sx, sy, sz = scales[:, 0], scales[:, 1], scales[:, 2]
out = np.zeros((n, 4, 4), dtype=np.float32)
out[:, 0, 0] = (1.0 - 2.0 * (yy + zz)) * sx
out[:, 0, 1] = (2.0 * (xy - wz)) * sy
out[:, 0, 2] = (2.0 * (xz + wy)) * sz
out[:, 1, 0] = (2.0 * (xy + wz)) * sx
out[:, 1, 1] = (1.0 - 2.0 * (xx + zz)) * sy
out[:, 1, 2] = (2.0 * (yz - wx)) * sz
out[:, 2, 0] = (2.0 * (xz - wy)) * sx
out[:, 2, 1] = (2.0 * (yz + wx)) * sy
out[:, 2, 2] = (1.0 - 2.0 * (xx + yy)) * sz
out[:, 0, 3] = positions[:, 0]
out[:, 1, 3] = positions[:, 1]
out[:, 2, 3] = positions[:, 2]
out[:, 3, 3] = 1.0
return out
[docs]
def mat4_to_bytes(m: np.ndarray) -> bytes:
"""Convert mat4 to bytes for GPU upload.
Args:
m: 4x4 numpy matrix
Returns:
64 bytes (row-major float32)
"""
return np.ascontiguousarray(m, dtype=np.float32).tobytes()
# ============================================================================
# Coroutine helpers — parallel, wait, wait_until, wait_signal, next_frame
# ============================================================================
[docs]
def parallel(*coroutines: Coroutine) -> Coroutine:
"""Run multiple coroutines simultaneously, finish when all complete."""
active = list(coroutines)
while active:
finished = []
for co in active:
try:
next(co)
except StopIteration:
finished.append(co)
for co in finished:
active.remove(co)
if active:
yield
[docs]
def wait(seconds: float) -> Coroutine:
"""Pause a coroutine for given seconds."""
end = time.monotonic() + seconds
while time.monotonic() < end:
yield
[docs]
def wait_until(condition: Callable[[], bool]) -> Coroutine:
"""Yield until condition() returns True."""
while not condition():
yield
[docs]
def wait_signal(signal: Signal) -> Coroutine:
"""Yield until signal is emitted. Returns signal args."""
received = [None]
done = [False]
def _on_signal(*args):
received[0] = args
done[0] = True
signal.connect(_on_signal)
try:
while not done[0]:
yield
finally:
signal.disconnect(_on_signal)
return received[0]
[docs]
def next_frame() -> Coroutine:
"""Yield for exactly one frame."""
yield
# ============================================================================
# Easing functions
# ============================================================================
[docs]
def ease_in_quad(t): return t * t
[docs]
def ease_out_quad(t): return t * (2 - t)
[docs]
def ease_in_out_quad(t): return 2*t*t if t < 0.5 else -1 + (4 - 2*t) * t
[docs]
def ease_in_cubic(t): return t * t * t
[docs]
def ease_out_cubic(t): return 1 - (1 - t) ** 3
# ============================================================================
# Raycasting helpers
# ============================================================================
[docs]
def screen_to_ray(screen_pos, screen_size, view, proj):
"""Convert a screen pixel coordinate to a world-space ray (origin, direction).
Returns (origin, direction) as Vec3.
"""
sp_x = float(screen_pos[0]) if isinstance(screen_pos, tuple | list) else float(screen_pos.x)
sp_y = float(screen_pos[1]) if isinstance(screen_pos, tuple | list) else float(screen_pos.y)
ss_x = float(screen_size[0]) if isinstance(screen_size, tuple | list) else float(screen_size.x)
ss_y = float(screen_size[1]) if isinstance(screen_size, tuple | list) else float(screen_size.y)
ndc_x = (2.0 * sp_x / ss_x) - 1.0
ndc_y = 1.0 - (2.0 * sp_y / ss_y)
# Ensure numpy arrays
if not isinstance(view, np.ndarray):
view = np.array(view, dtype=np.float32).reshape(4, 4)
if not isinstance(proj, np.ndarray):
proj = np.array(proj, dtype=np.float32).reshape(4, 4)
# Detect Vulkan Y-flip
if proj[1, 1] < 0:
ndc_y = -ndc_y
inv_vp = np.linalg.inv(proj @ view)
near = inv_vp @ np.array([ndc_x, ndc_y, -1.0, 1.0], dtype=np.float32)
far = inv_vp @ np.array([ndc_x, ndc_y, 1.0, 1.0], dtype=np.float32)
near_pos = near[:3] / near[3]
far_pos = far[:3] / far[3]
direction = far_pos - near_pos
direction = direction / np.linalg.norm(direction)
return Vec3(near_pos), Vec3(direction)
[docs]
def ray_intersect_sphere(origin, direction, center, radius: float):
"""Ray-sphere intersection. Returns distance t or None if no hit."""
if not isinstance(origin, Vec3):
origin = Vec3(origin)
if not isinstance(direction, Vec3):
direction = Vec3(direction)
if not isinstance(center, Vec3):
center = Vec3(center)
oc = origin - center
b = oc.dot(direction)
c = oc.dot(oc) - radius * radius
discriminant = b * b - c
if discriminant < 0:
return None
sqrt_d = math.sqrt(discriminant)
t = -b - sqrt_d
if t < 0:
t = -b + sqrt_d
return t if t >= 0 else None
# ============================================================================
# Debug logging hooks — zero cost when Debug.enabled is False
# ============================================================================
_debug_Debug: type | None = None
def _get_debug() -> type:
global _debug_Debug
if _debug_Debug is None:
from .debug import Debug
_debug_Debug = Debug
return _debug_Debug
def _debug_log_event(event_type, event, target, outcome):
Debug = _get_debug()
if Debug.enabled and Debug.ui_inspector.enabled:
Debug.ui_inspector.log_event(event_type, event, target, outcome)
def _debug_log_hit(point, result):
Debug = _get_debug()
if Debug.enabled and Debug.ui_inspector.enabled:
Debug.ui_inspector.log_hit_test(point, result)
def _debug_log_focus(old, new):
Debug = _get_debug()
if Debug.enabled and Debug.ui_inspector.enabled:
Debug.ui_inspector.log_focus_change(old, new)