Source code for simvx.core.scene_tree

"""SceneTree: Central manager for the node tree, groups, input routing, and UI focus."""

import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any

import numpy as np

from .event_bus import EventBus
from .events import InputEvent, TreeInputEvent

if TYPE_CHECKING:
    from .audio_listener import AudioListener2D, AudioListener3D
    from .audio_protocol import (
        AudioBackend,
        AudioBusBackend,
        AudioPlaybackBackend,
        AudioStreamingBackend,
    )
from .input.enums import JoyAxis, JoyButton, Key, MouseButton
from .math.raycast import ray_intersect_sphere, screen_to_ray
from .node import Node
from .nodes_2d.camera import Camera2D
from .nodes_3d.camera import Camera3D
from .nodes_3d.mesh import MeshInstance3D
from .physics_nodes import CollisionShape3D
from .signals import Signal

# The "events" autoload name is reserved for the engine-provided EventBus.
# Project TOMLs and runtime code may not declare or replace it.
RESERVED_AUTOLOAD_EVENTS = "events"

log = logging.getLogger(__name__)


def _match_mods(mods: tuple, event: TreeInputEvent) -> bool:
    """Match a filter's (ctrl, shift, alt, meta) spec against an event's modifier state.

    Each spec entry is True (must be pressed), False (must not be pressed),
    or None (don't care).
    """
    cs, ss, als, ms = mods
    if cs is not None and event.ctrl != cs:
        return False
    if ss is not None and event.shift != ss:
        return False
    if als is not None and event.alt != als:
        return False
    if ms is not None and event.meta != ms:
        return False
    return True

[docs] class SceneTree: """Central manager for the node tree, groups, input routing, and UI focus. Owns the root node and drives per-frame ``process`` / ``physics_process`` / ``draw`` traversals. Also manages pause state, scene changes, and the UI input pipeline (mouse, keyboard, popups). """ # Most-recently-active tree, set on every ``set_root``/``change_scene``. # InputMap reads it (best-effort) so ``add_action`` can warn when called # after the first tick: a common mistake that silently drops bindings in # the web exporter (which never invokes ``main()``). Kept as a weakref # would be ideal but the tree owns user nodes, not the other way around, # so a plain reference is fine and eviction happens at the next set_root. _active_tree: SceneTree | None = None
[docs] @classmethod def current(cls) -> SceneTree | None: """Return the most recently activated SceneTree, or ``None``. Activation happens automatically on ``set_root`` / ``change_scene``. Used by ``InputSimulator`` to deliver scene-tree + UI-tree events without requiring callers to thread a tree reference through. """ return cls._active_tree
[docs] @property def app(self): """The App instance running this tree (set by graphics backend).""" return getattr(self, "_app", None)
[docs] @property def events(self) -> EventBus: """Engine-provided typed event bus. Use ``tree.events.subscribe(EventCls, handler)`` to register a handler and ``tree.events.publish(event)`` (or ``publish_deferred``) to dispatch. The bus survives ``change_scene()`` -- subscriptions held by autoloads or other long-lived objects keep firing across scene swaps. Deferred events queued during a frame are dispatched at the start of the next ``process()`` tick, before any node ``_process`` runs. """ return self._events
[docs] @property def audio_backend(self) -> "AudioBackend | None": """The active audio backend, or ``None`` if none was initialised. Returns the union :class:`AudioBackend` type for backwards compatibility: callers that only need one facet should prefer the narrowed :attr:`audio_playback` / :attr:`audio_streaming` / :attr:`audio_buses` properties so the type checker enforces the boundary. Reaching for the underscore-prefixed attribute is engine-private and may change. The engine sets this during ``App.run`` via ``make_backend``. Tests and headless harnesses that don't initialise audio see ``None``. """ return getattr(self, "_audio_backend", None)
[docs] def install_audio_backend(self, backend: "AudioPlaybackBackend") -> None: """Install the audio backend for this tree: the one canonical path. This is the only supported way to attach a backend; assigning the private ``_audio_backend`` slot is no longer part of the contract. Used by ``App.run`` / ``WebApp`` at startup and by tests that inject a :class:`NullAudioBackend` or a mock. Semantics mirror startup driver selection in other engines (Godot's ``--audio-driver``, pyglet's ``audio`` option): a backend is installed once, before the tree starts producing sound. Re-installing shuts the previous backend down first so device handles don't leak. Facet conformance is enforced where it is consumed, not here: the :attr:`audio_playback` / :attr:`audio_streaming` / :attr:`audio_buses` accessors narrow via ``isinstance`` and return ``None`` for a backend that doesn't implement that facet, and callers raise :class:`AudioCapabilityError` on ``None``. Gating here as well would duplicate that boundary and reject legitimate partial backends. """ if backend is None: raise TypeError("install_audio_backend requires a backend, got None") previous = getattr(self, "_audio_backend", None) if previous is not None and previous is not backend: shutdown = getattr(previous, "shutdown", None) if callable(shutdown): shutdown() self._audio_backend = backend
[docs] @property def audio_playback(self) -> "AudioPlaybackBackend | None": """The active backend narrowed to :class:`AudioPlaybackBackend`, or ``None``. Always equals :attr:`audio_backend` cast to the playback facet when a backend is present: every shipped backend implements playback, including the silent :class:`NullAudioBackend`. """ from .audio_protocol import AudioPlaybackBackend as _APB backend = self.audio_backend if backend is None: return None return backend if isinstance(backend, _APB) else None
[docs] @property def audio_streaming(self) -> "AudioStreamingBackend | None": """The active backend narrowed to :class:`AudioStreamingBackend`, or ``None``. Returns ``None`` when the active backend doesn't implement streaming (the Null backend, any future no-device test stub). Callers that need streaming (:class:`AudioSynth` driver, AudioWorklet feeds) should raise :class:`AudioCapabilityError` on ``None``. """ from .audio_protocol import AudioStreamingBackend as _ASB backend = self.audio_backend if backend is None: return None return backend if isinstance(backend, _ASB) else None
[docs] @property def audio_buses(self) -> "AudioBusBackend | None": """The active backend narrowed to :class:`AudioBusBackend`, or ``None``. Always equals :attr:`audio_backend` cast to the bus facet when a backend is present: every shipped backend implements bus + capability advertisement. """ from .audio_protocol import AudioBusBackend as _ABB backend = self.audio_backend if backend is None: return None return backend if isinstance(backend, _ABB) else None
[docs] def audio_listener_3d(self) -> "AudioListener3D | None": """The active 3D audio listener, lazy-creating one if none exists. Returns the most recently entered :class:`AudioListener3D` in the scene. If none has been added, auto-creates a fallback parented to the active ``Camera3D`` with a one-time warning. Returns ``None`` only if there's no camera either. Audio players call this every frame, so the auto-creation is cheap once it's happened (the cached listener is returned). """ listener = self._current_audio_listener_3d if listener is not None: return listener from .audio_listener import _autocreate_listener_3d return _autocreate_listener_3d(self)
[docs] def audio_listener_2d(self) -> "AudioListener2D | None": """The active 2D audio listener, lazy-creating one if none exists. Same contract as :meth:`audio_listener_3d` but for 2D. """ listener = self._current_audio_listener_2d if listener is not None: return listener from .audio_listener import _autocreate_listener_2d return _autocreate_listener_2d(self)
[docs] @property def input(self): """The Input instance for this tree (per-tree isolation).""" return self._own_input
[docs] @property def input_map(self): """The InputMap instance for this tree (per-tree isolation).""" return self._own_input_map
[docs] @contextmanager def activate_input(self): """Context manager to make this tree's Input/InputMap the active ones. Use this when processing the tree so that game code calling ``Input.is_action_pressed(...)`` sees this tree's input state. """ from .input.map import set_active_input_map from .input.state import set_active_input with set_active_input(self._own_input), set_active_input_map(self._own_input_map): yield
def __init__(self, screen_size=None, *, isolated_input: bool = False): if isolated_input: from .input.map import _InputMap from .input.state import _Input self._own_input_map = _InputMap() self._own_input = _Input(input_map=self._own_input_map) else: from .input.map import _default_input_map from .input.state import _default_input self._own_input_map = _default_input_map self._own_input = _default_input self.root: Node | None = None self._screen_size: tuple[float, float] = SceneTree._normalize_size(screen_size or (800, 600)) self.paused: bool = False self._delete_queue: list[Node] = [] self._groups: dict[str, set[Node]] = {} self._autoloads: dict[str, Node] = {} # Engine-provided typed event bus, accessible as ``tree.events``. Held # outside ``_autoloads`` because EventBus is not a Node and does not # participate in the process/physics traversal -- the per-frame tick # explicitly calls ``self._events.flush_deferred()``. The name # ``events`` is reserved (see add_autoload() and project.py). self._events: EventBus = EventBus() self._unique_nodes: dict[str, Node] = {} # Input dispatch tables: populated as @on_input handlers register on tree-enter. # Keyed for O(1) lookup so input dispatch is O(handlers per event), not O(nodes). self._action_handlers: dict[tuple[str, bool], list[tuple[Node, str, tuple]]] = {} self._key_handlers: dict[tuple[Key, bool], list[tuple[Node, str, tuple]]] = {} self._key_handlers_any: dict[bool, list[tuple[Node, str, tuple[Key, ...], tuple]]] = {True: [], False: []} self._button_handlers: dict[tuple[MouseButton, bool], list[tuple[Node, str, tuple]]] = {} self._motion_handlers: list[tuple[Node, str]] = [] self._scroll_handlers: list[tuple[Node, str]] = [] self._joy_button_handlers: dict[tuple[JoyButton, bool], list[tuple[Node, str]]] = {} self._joy_axis_handlers: dict[JoyAxis, list[tuple[Node, str]]] = {} self._catch_all_handlers: list[tuple[Node, str]] = [] self._unhandled_input_dirty: bool = True # rebuilt from tree walk on next propagate_input from .ui.ui_input import UIInputManager # local import avoids cycle via ui/testing.py self._ui = UIInputManager() self._current_camera_2d: Camera2D | None = None # Active audio listeners: set by AudioListener2D/3D's on_enter_tree. # Audio players read these via the audio_listener_2d() / _3d() # accessors below, which lazy-create a fallback at the active # camera if no explicit listener exists in the scene. self._current_audio_listener_2d: "AudioListener2D | None" = None self._current_audio_listener_3d: "AudioListener3D | None" = None self.auto_physics: bool = True # Automatically step PhysicsServer each physics tick self.overlay_offset: tuple[float, float] = (0.0, 0.0) # 2D offset for Text2D/particle overlays self.play_viewport_rect: tuple[float, float, float, float] | None = None # (x, y, w, h) to constrain 3D rendering self._structure_version: int = 0 # Incremented on add_child/remove_child for cache invalidation self._running: bool = True # Frame counter: bumped at the start of every ``tick`` so callers # (notably InputMap.add_action's late-call warning) can tell whether # the scene has begun ticking. Stays at 0 inside the root node's # initial ``ready`` callback so registrations there are silent. self._tick_count: int = 0 # Monotonically-increasing scene time, in seconds. Accumulates ``dt`` # at the start of every ``tick()`` (after any ``App.time_scale`` # scaling has been applied to ``dt``) so node code can read a single # shared clock instead of maintaining its own ``_time += dt`` # accumulator. Frozen while ``self.paused`` is True. self._now: float = 0.0 # ``Property(on_change=..., coalesce=True)`` enqueues ``(obj, method_name)`` # pairs here instead of firing synchronously. Drained once at the end of # ``process()``: multiple writes within one tick collapse to a single call. # Set semantics dedupe automatically. self._pending_coalesced_hooks: set[tuple[Any, str]] = set() self.quit_requested: Signal = Signal() # Modal lifecycle: fires when any Control's show_modal()/close_modal() # pushes/pops the tree's modal stack. Receivers get the modal Control # so external listeners (e.g. game-side pause bridges) can react # without being involved in the modal itself. self.modal_opened: Signal = Signal() self.modal_closed: Signal = Signal() # Fires whenever ``screen_size`` changes (window resize, fullscreen # toggle, viewport swap). Receivers get the new ``(width, height)`` # tuple and can recompute layout once instead of every frame. self.screen_resized: Signal = Signal() # -- UI state forwarding (preserves external interface) -- @property def _focused_control(self): return self._ui._focused_control @_focused_control.setter def _focused_control(self, v): self._ui._focused_control = v @property def _mouse_grab(self) -> Any: return self._ui._mouse_grab @_mouse_grab.setter def _mouse_grab(self, v): self._ui._mouse_grab = v @property def _last_mouse_pos(self): return self._ui._last_mouse_pos @_last_mouse_pos.setter def _last_mouse_pos(self, v): self._ui._last_mouse_pos = v @property def _modal_stack(self) -> list: return self._ui._modal_stack @_modal_stack.setter def _modal_stack(self, v): self._ui._modal_stack = v @property def _shortcut_handler(self): return self._ui._shortcut_handler @_shortcut_handler.setter def _shortcut_handler(self, v): self._ui._shortcut_handler = v @staticmethod def _normalize_size(sz) -> tuple[float, float]: """Coerce any screen_size representation to a plain tuple once.""" if isinstance(sz, tuple) and len(sz) == 2: return sz if hasattr(sz, 'x'): return (float(sz.x), float(sz.y)) return (float(sz[0]), float(sz[1]))
[docs] @property def is_running(self) -> bool: """Whether the tree is actively being ticked by its driving app. Set to False by :meth:`quit` (or the graphics backend's app ``quit()``), signalling the main loop to exit at the end of the current frame. """ return self._running
[docs] @property def now(self) -> float: """Monotonically-increasing scene time, in seconds. Accumulates ``dt`` at the start of every :meth:`tick` call (after any :attr:`simvx.graphics.App.time_scale` scaling has been applied), so slow-motion and hitstop also slow this clock. Frozen while :attr:`paused` is True. Resets to ``0.0`` only by constructing a fresh ``SceneTree``. Use for time-based animation, slow-mo gating, and any "how long has the scene been running" query: preferable to per-node ``self._time += dt`` accumulators because every consumer reads the same monotonic value. """ return self._now
[docs] def quit(self) -> None: """Request a clean shutdown of the running tree. Emits :attr:`quit_requested` and flips :attr:`is_running` to False. The driving app polls this state and exits its loop at the end of the current frame. Safe to call from node callbacks or signal handlers. """ if not self._running: return self._running = False self.quit_requested()
@property def screen_size(self) -> tuple[float, float]: return self._screen_size
[docs] @screen_size.setter def screen_size(self, value): old = self._screen_size self._screen_size = SceneTree._normalize_size(value) if self._screen_size != old: self._invalidate_draw_caches() self.screen_resized(self._screen_size)
[docs] def set_root(self, root: Node): """Set the root node of the scene tree. Before the root enters the tree, any ``input_actions`` declared on the root (class- or instance-level ``dict[str, list]``) is bulk- registered with this tree's ``InputMap``. This is the canonical replacement for the wrapper-class + ``on_ready`` boilerplate and survives ``change_scene`` swaps -- every new root's actions are re-registered automatically. """ log.debug("SceneTree.set_root(%s)", root) # Publish "this is the active tree" so input map / asset paths that # need cross-cutting context can find us without a parameter chain. SceneTree._active_tree = self self.root = root self._register_declared_input_actions(root) root._enter_tree(self) root._ready_recursive()
def _register_declared_input_actions(self, root: Node) -> None: actions = getattr(root, "input_actions", None) if not actions: return if not isinstance(actions, dict): log.warning( "Root %s.input_actions must be a dict[str, list]; got %s -- skipping.", type(root).__name__, type(actions).__name__, ) return for name, bindings in actions.items(): self._own_input_map.add_action(name, list(bindings) if bindings else None, _quiet=True)
[docs] def change_scene(self, new_root: Node): """Swap the active root with ``new_root``. The old root receives ``_exit_tree``; ``new_root`` then runs the full ``_enter_tree`` / ``_ready_recursive`` path, identical to the initial root. Autoloads are left in place and their groups and unique-name entries are re-registered on the rebuilt tree. Pending deletes, UI popup state, and the active 2D camera are cleared. Use this for title → gameplay → game-over navigation. See :doc:`../patterns` for a full example. """ log.debug("SceneTree.change_scene(%s), old root=%s", new_root, self.root) if self.root: self.root._exit_tree() # Rebuild groups from autoloads only (scene groups were cleared by _exit_tree) self._groups.clear() for node in self._autoloads.values(): self._reregister_groups(node) self._unique_nodes.clear() for node in self._autoloads.values(): self._reregister_unique(node) self._delete_queue.clear() self._ui.reset() self._current_camera_2d = None self.set_root(new_root)
[docs] def tick(self, dt: float): """Run process callbacks and coroutines on all nodes for one frame. Order: autoloads' ``_process`` first (Godot-style global singletons), then ``self.events.flush_deferred()`` so deferred events queued during the previous frame (process, physics, or input) reach all subscribers before scene logic runs, then the scene root's ``_process``. Anything ``emit_deferred``'d during this frame's autoload pass is also drained in the same flush, keeping the scene's view of the world consistent. """ with self.activate_input(): from .assets import AssetServer from .ui.core import Control Control._current_frame += 1 self._tick_count += 1 if not self.paused: self._now += dt # Asset-loader completions arrive on worker threads; drain them # onto the main thread before any node code runs so handlers see # a stable world. if AssetServer._instance is not None: AssetServer._instance.flush() for node in self._autoloads.values(): node._process_recursive(dt, self.paused) self._events.flush_deferred() if self.root: self.root._process_recursive(dt, self.paused) self._flush_deletes() self._flush_coalesced_hooks() # Audio backend reconciliation. Native MiniaudioBackend uses this # to detect AudioBus volume / effect-chain changes (Property # mutations on AudioBus aren't on_change-instrumented: diffing # snapshot is the canonical sync path). Legacy + Null backends # are no-ops; web reads bus state during its drain. One-line # call so live UI changes (sliders, toggles) take effect next # frame on every backend. backend = getattr(self, "_audio_backend", None) if backend is not None: from .audio_bus import AudioBusLayout from .audio_errors import AudioError, raise_or_warn try: backend.sync_bus_layout(AudioBusLayout.get_default()) except AudioError as exc: # Per-frame call: let the audio system's strict/warn-once # policy decide whether to raise or log. Other exceptions # propagate (genuine engine bugs shouldn't be hidden by # the per-frame catch-all). raise_or_warn( exc, key="audio.scene_tree.sync_bus_layout_failed", message="audio backend sync_bus_layout failed", )
def _flush_coalesced_hooks(self) -> None: """Drain pending ``on_change`` callbacks queued by ``coalesce=True`` Properties. Iterates a snapshot so hooks that themselves write to ``coalesce`` properties enqueue onto the next frame (the natural debounce). Missing methods are ignored: the owning node may have been removed mid-frame. """ if not self._pending_coalesced_hooks: return pending = self._pending_coalesced_hooks self._pending_coalesced_hooks = set() for obj, method_name in pending: method = getattr(obj, method_name, None) if method is not None: method()
[docs] def physics_tick(self, dt: float): """Run physics_process callbacks on all nodes, then auto-step physics.""" with self.activate_input(): for node in self._autoloads.values(): node._physics_process_recursive(dt, self.paused) if self.root: self.root._physics_process_recursive(dt, self.paused) if self.auto_physics: from .physics import PhysicsServer server = PhysicsServer.get() if server and server.body_count > 0: server.step(dt)
[docs] def propagate_input(self, event: TreeInputEvent) -> None: """Dispatch an input event to registered ``@on_input`` handlers. Looks up handlers via the typed dispatch tables (built when nodes carrying ``@on_input``-decorated methods enter the tree). The traversal is O(handlers per event), not O(nodes): nodes without any input handlers cost nothing. A handler returning a truthy value marks the event consumed: ``on_unhandled_input`` only fires if nothing consumed the event. ``event.handled`` is also flipped True so callers can short-circuit. """ handled = False # Action filters: route via the active InputMap's bindings. for action_name in self._actions_for_event(event): released_key = (action_name, not event.pressed) for node, method_name, mods in self._action_handlers.get(released_key, ()): if not _match_mods(mods, event): continue if getattr(node, method_name)(event): handled = True # Direct typed dispatch. if event.type == "key" and event.key is not None: released = not event.pressed for node, method_name, mods in self._key_handlers.get((event.key, released), ()): if not _match_mods(mods, event): continue if getattr(node, method_name)(event): handled = True for node, method_name, keys, mods in self._key_handlers_any.get(released, ()): if event.key not in keys: continue if not _match_mods(mods, event): continue if getattr(node, method_name)(event): handled = True elif event.type == "mouse_button" and event.mouse_button is not None: released = not event.pressed for node, method_name, mods in self._button_handlers.get((event.mouse_button, released), ()): if not _match_mods(mods, event): continue if getattr(node, method_name)(event): handled = True elif event.type == "mouse_motion": for node, method_name in self._motion_handlers: if getattr(node, method_name)(event): handled = True elif event.type == "scroll": for node, method_name in self._scroll_handlers: if getattr(node, method_name)(event): handled = True elif event.type == "joy_button" and event.joy_button is not None: released = not event.pressed for node, method_name in self._joy_button_handlers.get((event.joy_button, released), ()): if getattr(node, method_name)(event): handled = True elif event.type == "joy_axis" and event.joy_axis is not None: for node, method_name in self._joy_axis_handlers.get(event.joy_axis, ()): if getattr(node, method_name)(event): handled = True # Catch-all fires for every event type. for node, method_name in self._catch_all_handlers: if getattr(node, method_name)(event): handled = True event.handled = event.handled or handled # Unhandled chain: only fires if nothing consumed. if not handled: for node, method_name in self._collect_unhandled_handlers(): self._safe_invoke(node, method_name, event)
# -- Input dispatch table maintenance --------------------------------- def _register_input_node(self, node: Node) -> None: """Insert *node*'s ``@on_input`` handlers into the dispatch tables.""" for method_name, filt in type(node)._simvx_input_handlers: kind = filt["kind"] target = filt["target"] released = filt["released"] mods = filt["mods"] if kind == "action": self._action_handlers.setdefault((target, released), []).append((node, method_name, mods)) elif kind == "key": if len(target) == 1: self._key_handlers.setdefault((target[0], released), []).append((node, method_name, mods)) else: self._key_handlers_any[released].append((node, method_name, target, mods)) elif kind == "button": self._button_handlers.setdefault((target, released), []).append((node, method_name, mods)) elif kind == "motion": self._motion_handlers.append((node, method_name)) elif kind == "scroll": self._scroll_handlers.append((node, method_name)) elif kind == "joy_button": self._joy_button_handlers.setdefault((target, released), []).append((node, method_name)) elif kind == "joy_axis": self._joy_axis_handlers.setdefault(target, []).append((node, method_name)) elif kind == "catch_all": self._catch_all_handlers.append((node, method_name)) def _unregister_input_node(self, node: Node) -> None: """Remove *node*'s entries from the dispatch tables on tree-exit.""" def _drop(seq): seq[:] = [t for t in seq if t[0] is not node] for bucket in self._action_handlers.values(): _drop(bucket) for bucket in self._key_handlers.values(): _drop(bucket) for bucket in self._key_handlers_any.values(): _drop(bucket) for bucket in self._button_handlers.values(): _drop(bucket) _drop(self._motion_handlers) _drop(self._scroll_handlers) for bucket in self._joy_button_handlers.values(): _drop(bucket) for bucket in self._joy_axis_handlers.values(): _drop(bucket) _drop(self._catch_all_handlers) def _actions_for_event(self, event: TreeInputEvent) -> list[str]: """Return action names whose bindings match *event*. Empty list if none.""" if event.type == "key": target_key = event.key target_button = None target_joy_button = None elif event.type == "mouse_button": target_key = None target_button = event.mouse_button target_joy_button = None elif event.type == "joy_button": target_key = None target_button = None target_joy_button = event.joy_button else: return [] if target_key is None and target_button is None and target_joy_button is None: return [] matched: list[str] = [] for action_name, bindings in self.input_map._actions.items(): for b in bindings: if target_key is not None and b.key == target_key: matched.append(action_name) break if target_button is not None and b.mouse_button == target_button: matched.append(action_name) break if target_joy_button is not None and b.joy_button == target_joy_button: matched.append(action_name) break return matched def _collect_unhandled_handlers(self) -> list[tuple[Node, str]]: """Walk the tree to gather ``on_unhandled_input`` overrides + decorated handlers. Computed lazily; tree mutations bump ``_structure_version`` so we rebuild only when the topology changes. """ cached = getattr(self, "_unhandled_cache", None) cached_version = getattr(self, "_unhandled_cache_version", -1) if cached is not None and cached_version == self._structure_version: return cached result: list[tuple[Node, str]] = [] if self.root is not None: for node in self.root.walk(include_self=True): methods = type(node)._simvx_hooks.get("unhandled_input", ()) for m in methods: result.append((node, m)) for autoload in self._autoloads.values(): for node in autoload.walk(include_self=True): methods = type(node)._simvx_hooks.get("unhandled_input", ()) for m in methods: result.append((node, m)) self._unhandled_cache = result self._unhandled_cache_version = self._structure_version return result def _safe_invoke(self, node: Node, method_name: str, *args: Any) -> Any: """Call ``node.method_name(*args)`` with the same error containment Node uses.""" if node._script_error: return None try: return getattr(node, method_name)(*args) except AssertionError: raise except Exception: if Node.strict_errors: raise node._script_error = True import sys import traceback tb = traceback.format_exc() print(f"Script error in {node.name}.{method_name}: node disabled:\n{tb}", file=sys.stderr) log.error("Script error in %s.%s: node disabled", node.name, method_name) try: Node.script_error_raised.emit(node, method_name, tb) except Exception: # A handler of the global script-error signal itself raised. # Log and continue: error-recovery code must not derail. log.debug("script_error_raised handler failed", exc_info=True) return None
[docs] def render(self, renderer): cam = self._current_camera_2d _has = hasattr(renderer, 'push_transform') if _has and cam is not None: z = cam.zoom if cam.zoom > 0 else 1.0 sw, sh = self._screen_size sx, sy = cam._shake_offset renderer.push_transform( z, 0.0, 0.0, z, (-float(cam.current[0]) + float(sx)) * z + sw * 0.5, (-float(cam.current[1]) + float(sy)) * z + sh * 0.5, ) if self.root: self.root._draw_recursive(renderer) if _has and cam is not None: renderer.pop_transform() # Modal Controls render through the standard child-draw walk; the # ``top_level`` flag plus a second screen-space pass keeps them above # sibling Controls regardless of tree position. Submission order is # the GPU order, so modals drawn after the main tree paint on top. if self._ui._modal_stack: if hasattr(renderer, 'reset_clip'): renderer.reset_clip() for modal in self._ui._modal_stack: if getattr(modal, "top_level", False) and modal.visible: modal._draw_recursive(renderer)
[docs] def push_modal(self, control): """Register a control as an active modal (drawn on top, receives all UI input).""" self._ui.push_modal(control) self.modal_opened(control)
[docs] def pop_modal(self, control): """Unregister a modal control.""" self._ui.pop_modal(control) self.modal_closed(control)
[docs] def input_cast(self, screen_pos: tuple[float, float] | np.ndarray, button: MouseButton = MouseButton.LEFT): """Cast a ray from screen_pos through the camera into the scene. Finds the nearest pickable CollisionShape3D and delivers an InputEvent to its parent node.""" if not self.root: return cameras = self.root.find_all(Camera3D) if not cameras: return camera = cameras[0] # screen_size is normalized to tuple: no isinstance per call sw, sh = self.screen_size aspect = sw / sh if sh > 0 else 1.0 view = camera.view_matrix proj = camera.projection_matrix(aspect) origin, direction = screen_to_ray(screen_pos, self.screen_size, view, proj) # Find nearest pickable collision shape best_t = float('inf') best_node = None for shape in self.root.find_all(CollisionShape3D): if not shape.pickable: continue t = ray_intersect_sphere(origin, direction, shape.world_position, shape.radius) if t is not None and t < best_t: best_t = t best_node = shape.parent if shape.parent else shape if best_node is not None: event = InputEvent(screen_pos, button, origin, direction, best_t) best_node.on_picked(event)
[docs] def get_group(self, name: str) -> list[Node]: """Get all nodes in a group.""" return list(self._groups.get(name, ()))
# --- Autoloads (singletons that persist across scene changes) ---
[docs] @property def autoloads(self) -> dict[str, Node]: """Read-only view of registered autoloads.""" return self._autoloads
[docs] def add_autoload(self, name: str, node: Node): """Register ``node`` as a persistent singleton attached to the tree. The node enters the tree and runs ``on_ready()`` immediately. Unlike a regular child, it is not reachable via the scene root: retrieve it via ``tree.autoloads[name]``. Autoloads survive ``change_scene()``, making them the canonical home for global state (score, settings, audio manager). See :doc:`../patterns`. The name ``"events"`` is reserved for the engine-provided :class:`~simvx.core.event_bus.EventBus`; use ``tree.events`` instead. """ if name == RESERVED_AUTOLOAD_EVENTS: raise ValueError( "Autoload name 'events' is reserved for the engine-provided " "EventBus. Access it via tree.events; pick a different name " "for your autoload." ) self._autoloads[name] = node node._enter_tree(self) node._ready_recursive()
[docs] def remove_autoload(self, name: str): """Unregister and tear down an autoload. Calls ``_exit_tree`` on the node.""" node = self._autoloads.pop(name, None) if node: node._exit_tree()
# --- Unique nodes ---
[docs] def get_unique(self, name: str) -> Node | None: """Get a unique node by name. Returns None if not found.""" return self._unique_nodes.get(name)
# --- Internal helpers --- def _group_add(self, group: str, node: Node): if group not in self._groups: self._groups[group] = set() self._groups[group].add(node) def _group_remove(self, group: str, node: Node): if group in self._groups: self._groups[group].discard(node) def _reregister_groups(self, node: Node): """Re-add a node (and descendants) to the group index.""" for group in node._groups: self._group_add(group, node) for child in node.children: self._reregister_groups(child) def _reregister_unique(self, node: Node): """Re-add a node (and descendants) to the unique-node index.""" if node.unique_name: self._unique_nodes[node.name] = node for child in node.children: self._reregister_unique(child) def _invalidate_draw_caches(self): """Invalidate draw caches of Controls whose rect depends on screen/parent size. Only anchored controls (any of anchor_{left,top,right,bottom} != 0) have their absolute rect affected by a screen-size change. Controls with the default zero anchors have rects computed purely from their own position/size, so their caches stay valid here: own-size changes are handled by Property setters and own-position changes by _invalidate_transform. Uses an iterative stack to avoid Python recursion overhead on deep trees. """ if not self.root: return from .ui import Control stack = [self.root] while stack: node = stack.pop() if isinstance(node, Control) and node._draw_cache is not None: if (node.anchor_left or node.anchor_top or node.anchor_right or node.anchor_bottom): node._draw_dirty = True node._draw_cache = None children = node.children if children: stack.extend(children) def _queue_delete(self, node: Node): self._delete_queue.append(node) def _flush_deletes(self): for node in self._delete_queue: if node.parent: node.parent.remove_child(node) self._delete_queue.clear() def _collect_render_queue(self): """Collect all renderable nodes into batches (backend use only). Returns: List of RenderBatch objects, sorted by material/blend mode. Backends call this once per frame to get optimized draw list. """ from .render_queue import RenderQueue queue = RenderQueue() if self.root: self._collect_meshes_recursive(self.root, queue) return queue.sorted_batches def _collect_meshes_recursive(self, node: Node, queue): """Traverse tree collecting MeshInstance3D nodes into batches.""" if isinstance(node, MeshInstance3D) and node.mesh is not None: queue.add_instance(node.mesh, node.material, node.model_matrix, node.layer if hasattr(node, 'layer') else 0) for child in node.children: self._collect_meshes_recursive(child, queue) # ======================================================================== # UI Input System (delegated to UIInputManager) # ========================================================================
[docs] def ui_input(self, mouse_pos: tuple[float, float] | np.ndarray = None, button: MouseButton | None = None, pressed: bool = True, key: str = "", char: str = ""): """Route UI input events to controls. ``button`` is a ``MouseButton`` for press/release, ``None`` for keyboard / char / pure mouse-move events. """ if button is not None and not isinstance(button, MouseButton): raise TypeError( f"SceneTree.ui_input: button must be MouseButton or None, got {type(button).__name__}" ) self._ui.ui_input(self.root, mouse_pos=mouse_pos, button=button, pressed=pressed, key=key, char=char)
[docs] def touch_input(self, finger_id: int, action: int, x: float, y: float): """Route multi-touch events to controls with touch_mode='multi'.""" self._ui.touch_input(self.root, finger_id, action, x, y)
def _set_focused_control(self, control): """Set the focused control (forwarded to UIInputManager).""" self._ui._set_focused_control(control) def _update_mouse_over_states(self, mouse_pos): """Update mouse_over state for all controls (forwarded to UIInputManager).""" self._ui._update_mouse_over_states(self.root, mouse_pos) def _find_control_at_point(self, point): """Find topmost control at screen position (forwarded to UIInputManager).""" return self._ui._find_control_at_point(self.root, point)