"""SceneTree: Central manager for the node tree, groups, input routing, and UI focus."""
import logging
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
import numpy as np
from .event_bus import EventBus
from .events import InputEvent, TreeInputEvent
if TYPE_CHECKING:
from .audio_listener import AudioListener2D, AudioListener3D
from .audio_protocol import (
AudioBackend,
AudioBusBackend,
AudioPlaybackBackend,
AudioStreamingBackend,
)
from .input.enums import JoyAxis, JoyButton, Key, MouseButton
from .math.raycast import ray_intersect_sphere, screen_to_ray
from .node import Node
from .nodes_2d.camera import Camera2D
from .nodes_3d.camera import Camera3D
from .nodes_3d.mesh import MeshInstance3D
from .physics_nodes import CollisionShape3D
from .signals import Signal
# The "events" autoload name is reserved for the engine-provided EventBus.
# Project TOMLs and runtime code may not declare or replace it.
RESERVED_AUTOLOAD_EVENTS = "events"
log = logging.getLogger(__name__)
def _match_mods(mods: tuple, event: TreeInputEvent) -> bool:
"""Match a filter's (ctrl, shift, alt, meta) spec against an event's modifier state.
Each spec entry is True (must be pressed), False (must not be pressed),
or None (don't care).
"""
cs, ss, als, ms = mods
if cs is not None and event.ctrl != cs:
return False
if ss is not None and event.shift != ss:
return False
if als is not None and event.alt != als:
return False
if ms is not None and event.meta != ms:
return False
return True
[docs]
class SceneTree:
"""Central manager for the node tree, groups, input routing, and UI focus.
Owns the root node and drives per-frame ``process`` / ``physics_process`` /
``draw`` traversals. Also manages pause state, scene changes, and the UI
input pipeline (mouse, keyboard, popups).
"""
# Most-recently-active tree, set on every ``set_root``/``change_scene``.
# InputMap reads it (best-effort) so ``add_action`` can warn when called
# after the first tick: a common mistake that silently drops bindings in
# the web exporter (which never invokes ``main()``). Kept as a weakref
# would be ideal but the tree owns user nodes, not the other way around,
# so a plain reference is fine and eviction happens at the next set_root.
_active_tree: SceneTree | None = None
[docs]
@classmethod
def current(cls) -> SceneTree | None:
"""Return the most recently activated SceneTree, or ``None``.
Activation happens automatically on ``set_root`` / ``change_scene``.
Used by ``InputSimulator`` to deliver scene-tree + UI-tree events
without requiring callers to thread a tree reference through.
"""
return cls._active_tree
[docs]
@property
def app(self):
"""The App instance running this tree (set by graphics backend)."""
return getattr(self, "_app", None)
[docs]
@property
def events(self) -> EventBus:
"""Engine-provided typed event bus.
Use ``tree.events.subscribe(EventCls, handler)`` to register a
handler and ``tree.events.publish(event)`` (or
``publish_deferred``) to dispatch. The bus survives
``change_scene()`` -- subscriptions held by autoloads or other
long-lived objects keep firing across scene swaps. Deferred
events queued during a frame are dispatched at the start of the
next ``process()`` tick, before any node ``_process`` runs.
"""
return self._events
[docs]
@property
def audio_backend(self) -> "AudioBackend | None":
"""The active audio backend, or ``None`` if none was initialised.
Returns the union :class:`AudioBackend` type for backwards
compatibility: callers that only need one facet should prefer the
narrowed :attr:`audio_playback` / :attr:`audio_streaming` /
:attr:`audio_buses` properties so the type checker enforces the
boundary. Reaching for the underscore-prefixed attribute is
engine-private and may change.
The engine sets this during ``App.run`` via ``make_backend``. Tests
and headless harnesses that don't initialise audio see ``None``.
"""
return getattr(self, "_audio_backend", None)
[docs]
def install_audio_backend(self, backend: "AudioPlaybackBackend") -> None:
"""Install the audio backend for this tree: the one canonical path.
This is the only supported way to attach a backend; assigning the
private ``_audio_backend`` slot is no longer part of the contract.
Used by ``App.run`` / ``WebApp`` at startup and by tests that inject a
:class:`NullAudioBackend` or a mock.
Semantics mirror startup driver selection in other engines (Godot's
``--audio-driver``, pyglet's ``audio`` option): a backend is installed
once, before the tree starts producing sound. Re-installing shuts the
previous backend down first so device handles don't leak.
Facet conformance is enforced where it is consumed, not here: the
:attr:`audio_playback` / :attr:`audio_streaming` / :attr:`audio_buses`
accessors narrow via ``isinstance`` and return ``None`` for a backend
that doesn't implement that facet, and callers raise
:class:`AudioCapabilityError` on ``None``. Gating here as well would
duplicate that boundary and reject legitimate partial backends.
"""
if backend is None:
raise TypeError("install_audio_backend requires a backend, got None")
previous = getattr(self, "_audio_backend", None)
if previous is not None and previous is not backend:
shutdown = getattr(previous, "shutdown", None)
if callable(shutdown):
shutdown()
self._audio_backend = backend
[docs]
@property
def audio_playback(self) -> "AudioPlaybackBackend | None":
"""The active backend narrowed to :class:`AudioPlaybackBackend`, or ``None``.
Always equals :attr:`audio_backend` cast to the playback facet when
a backend is present: every shipped backend implements playback,
including the silent :class:`NullAudioBackend`.
"""
from .audio_protocol import AudioPlaybackBackend as _APB
backend = self.audio_backend
if backend is None:
return None
return backend if isinstance(backend, _APB) else None
[docs]
@property
def audio_streaming(self) -> "AudioStreamingBackend | None":
"""The active backend narrowed to :class:`AudioStreamingBackend`, or ``None``.
Returns ``None`` when the active backend doesn't implement streaming
(the Null backend, any future no-device test stub). Callers that
need streaming (:class:`AudioSynth` driver, AudioWorklet feeds)
should raise :class:`AudioCapabilityError` on ``None``.
"""
from .audio_protocol import AudioStreamingBackend as _ASB
backend = self.audio_backend
if backend is None:
return None
return backend if isinstance(backend, _ASB) else None
[docs]
@property
def audio_buses(self) -> "AudioBusBackend | None":
"""The active backend narrowed to :class:`AudioBusBackend`, or ``None``.
Always equals :attr:`audio_backend` cast to the bus facet when a
backend is present: every shipped backend implements bus + capability
advertisement.
"""
from .audio_protocol import AudioBusBackend as _ABB
backend = self.audio_backend
if backend is None:
return None
return backend if isinstance(backend, _ABB) else None
[docs]
def audio_listener_3d(self) -> "AudioListener3D | None":
"""The active 3D audio listener, lazy-creating one if none exists.
Returns the most recently entered :class:`AudioListener3D` in the
scene. If none has been added, auto-creates a fallback parented
to the active ``Camera3D`` with a one-time warning. Returns
``None`` only if there's no camera either.
Audio players call this every frame, so the auto-creation is
cheap once it's happened (the cached listener is returned).
"""
listener = self._current_audio_listener_3d
if listener is not None:
return listener
from .audio_listener import _autocreate_listener_3d
return _autocreate_listener_3d(self)
[docs]
def audio_listener_2d(self) -> "AudioListener2D | None":
"""The active 2D audio listener, lazy-creating one if none exists.
Same contract as :meth:`audio_listener_3d` but for 2D.
"""
listener = self._current_audio_listener_2d
if listener is not None:
return listener
from .audio_listener import _autocreate_listener_2d
return _autocreate_listener_2d(self)
def __init__(self, screen_size=None, *, isolated_input: bool = False):
if isolated_input:
from .input.map import _InputMap
from .input.state import _Input
self._own_input_map = _InputMap()
self._own_input = _Input(input_map=self._own_input_map)
else:
from .input.map import _default_input_map
from .input.state import _default_input
self._own_input_map = _default_input_map
self._own_input = _default_input
self.root: Node | None = None
self._screen_size: tuple[float, float] = SceneTree._normalize_size(screen_size or (800, 600))
self.paused: bool = False
self._delete_queue: list[Node] = []
self._groups: dict[str, set[Node]] = {}
self._autoloads: dict[str, Node] = {}
# Engine-provided typed event bus, accessible as ``tree.events``. Held
# outside ``_autoloads`` because EventBus is not a Node and does not
# participate in the process/physics traversal -- the per-frame tick
# explicitly calls ``self._events.flush_deferred()``. The name
# ``events`` is reserved (see add_autoload() and project.py).
self._events: EventBus = EventBus()
self._unique_nodes: dict[str, Node] = {}
# Input dispatch tables: populated as @on_input handlers register on tree-enter.
# Keyed for O(1) lookup so input dispatch is O(handlers per event), not O(nodes).
self._action_handlers: dict[tuple[str, bool], list[tuple[Node, str, tuple]]] = {}
self._key_handlers: dict[tuple[Key, bool], list[tuple[Node, str, tuple]]] = {}
self._key_handlers_any: dict[bool, list[tuple[Node, str, tuple[Key, ...], tuple]]] = {True: [], False: []}
self._button_handlers: dict[tuple[MouseButton, bool], list[tuple[Node, str, tuple]]] = {}
self._motion_handlers: list[tuple[Node, str]] = []
self._scroll_handlers: list[tuple[Node, str]] = []
self._joy_button_handlers: dict[tuple[JoyButton, bool], list[tuple[Node, str]]] = {}
self._joy_axis_handlers: dict[JoyAxis, list[tuple[Node, str]]] = {}
self._catch_all_handlers: list[tuple[Node, str]] = []
self._unhandled_input_dirty: bool = True # rebuilt from tree walk on next propagate_input
from .ui.ui_input import UIInputManager # local import avoids cycle via ui/testing.py
self._ui = UIInputManager()
self._current_camera_2d: Camera2D | None = None
# Active audio listeners: set by AudioListener2D/3D's on_enter_tree.
# Audio players read these via the audio_listener_2d() / _3d()
# accessors below, which lazy-create a fallback at the active
# camera if no explicit listener exists in the scene.
self._current_audio_listener_2d: "AudioListener2D | None" = None
self._current_audio_listener_3d: "AudioListener3D | None" = None
self.auto_physics: bool = True # Automatically step PhysicsServer each physics tick
self.overlay_offset: tuple[float, float] = (0.0, 0.0) # 2D offset for Text2D/particle overlays
self.play_viewport_rect: tuple[float, float, float, float] | None = None # (x, y, w, h) to constrain 3D rendering
self._structure_version: int = 0 # Incremented on add_child/remove_child for cache invalidation
self._running: bool = True
# Frame counter: bumped at the start of every ``tick`` so callers
# (notably InputMap.add_action's late-call warning) can tell whether
# the scene has begun ticking. Stays at 0 inside the root node's
# initial ``ready`` callback so registrations there are silent.
self._tick_count: int = 0
# Monotonically-increasing scene time, in seconds. Accumulates ``dt``
# at the start of every ``tick()`` (after any ``App.time_scale``
# scaling has been applied to ``dt``) so node code can read a single
# shared clock instead of maintaining its own ``_time += dt``
# accumulator. Frozen while ``self.paused`` is True.
self._now: float = 0.0
# ``Property(on_change=..., coalesce=True)`` enqueues ``(obj, method_name)``
# pairs here instead of firing synchronously. Drained once at the end of
# ``process()``: multiple writes within one tick collapse to a single call.
# Set semantics dedupe automatically.
self._pending_coalesced_hooks: set[tuple[Any, str]] = set()
self.quit_requested: Signal = Signal()
# Modal lifecycle: fires when any Control's show_modal()/close_modal()
# pushes/pops the tree's modal stack. Receivers get the modal Control
# so external listeners (e.g. game-side pause bridges) can react
# without being involved in the modal itself.
self.modal_opened: Signal = Signal()
self.modal_closed: Signal = Signal()
# Fires whenever ``screen_size`` changes (window resize, fullscreen
# toggle, viewport swap). Receivers get the new ``(width, height)``
# tuple and can recompute layout once instead of every frame.
self.screen_resized: Signal = Signal()
# -- UI state forwarding (preserves external interface) --
@property
def _focused_control(self):
return self._ui._focused_control
@_focused_control.setter
def _focused_control(self, v):
self._ui._focused_control = v
@property
def _mouse_grab(self) -> Any:
return self._ui._mouse_grab
@_mouse_grab.setter
def _mouse_grab(self, v):
self._ui._mouse_grab = v
@property
def _last_mouse_pos(self):
return self._ui._last_mouse_pos
@_last_mouse_pos.setter
def _last_mouse_pos(self, v):
self._ui._last_mouse_pos = v
@property
def _modal_stack(self) -> list:
return self._ui._modal_stack
@_modal_stack.setter
def _modal_stack(self, v):
self._ui._modal_stack = v
@property
def _shortcut_handler(self):
return self._ui._shortcut_handler
@_shortcut_handler.setter
def _shortcut_handler(self, v):
self._ui._shortcut_handler = v
@staticmethod
def _normalize_size(sz) -> tuple[float, float]:
"""Coerce any screen_size representation to a plain tuple once."""
if isinstance(sz, tuple) and len(sz) == 2:
return sz
if hasattr(sz, 'x'):
return (float(sz.x), float(sz.y))
return (float(sz[0]), float(sz[1]))
[docs]
@property
def is_running(self) -> bool:
"""Whether the tree is actively being ticked by its driving app.
Set to False by :meth:`quit` (or the graphics backend's app ``quit()``),
signalling the main loop to exit at the end of the current frame.
"""
return self._running
[docs]
@property
def now(self) -> float:
"""Monotonically-increasing scene time, in seconds.
Accumulates ``dt`` at the start of every :meth:`tick` call (after
any :attr:`simvx.graphics.App.time_scale` scaling has been applied),
so slow-motion and hitstop also slow this clock. Frozen while
:attr:`paused` is True. Resets to ``0.0`` only by constructing a
fresh ``SceneTree``.
Use for time-based animation, slow-mo gating, and any "how long has
the scene been running" query: preferable to per-node
``self._time += dt`` accumulators because every consumer reads the
same monotonic value.
"""
return self._now
[docs]
def quit(self) -> None:
"""Request a clean shutdown of the running tree.
Emits :attr:`quit_requested` and flips :attr:`is_running` to False. The
driving app polls this state and exits its loop at the end of the
current frame. Safe to call from node callbacks or signal handlers.
"""
if not self._running:
return
self._running = False
self.quit_requested()
@property
def screen_size(self) -> tuple[float, float]:
return self._screen_size
[docs]
@screen_size.setter
def screen_size(self, value):
old = self._screen_size
self._screen_size = SceneTree._normalize_size(value)
if self._screen_size != old:
self._invalidate_draw_caches()
self.screen_resized(self._screen_size)
[docs]
def set_root(self, root: Node):
"""Set the root node of the scene tree.
Before the root enters the tree, any ``input_actions`` declared on
the root (class- or instance-level ``dict[str, list]``) is bulk-
registered with this tree's ``InputMap``. This is the canonical
replacement for the wrapper-class + ``on_ready`` boilerplate and
survives ``change_scene`` swaps -- every new root's actions are
re-registered automatically.
"""
log.debug("SceneTree.set_root(%s)", root)
# Publish "this is the active tree" so input map / asset paths that
# need cross-cutting context can find us without a parameter chain.
SceneTree._active_tree = self
self.root = root
self._register_declared_input_actions(root)
root._enter_tree(self)
root._ready_recursive()
def _register_declared_input_actions(self, root: Node) -> None:
actions = getattr(root, "input_actions", None)
if not actions:
return
if not isinstance(actions, dict):
log.warning(
"Root %s.input_actions must be a dict[str, list]; got %s -- skipping.",
type(root).__name__, type(actions).__name__,
)
return
for name, bindings in actions.items():
self._own_input_map.add_action(name, list(bindings) if bindings else None, _quiet=True)
[docs]
def change_scene(self, new_root: Node):
"""Swap the active root with ``new_root``.
The old root receives ``_exit_tree``; ``new_root`` then runs the full
``_enter_tree`` / ``_ready_recursive`` path, identical to the initial
root. Autoloads are left in place and their groups and unique-name
entries are re-registered on the rebuilt tree. Pending deletes, UI
popup state, and the active 2D camera are cleared.
Use this for title → gameplay → game-over navigation. See
:doc:`../patterns` for a full example.
"""
log.debug("SceneTree.change_scene(%s), old root=%s", new_root, self.root)
if self.root:
self.root._exit_tree()
# Rebuild groups from autoloads only (scene groups were cleared by _exit_tree)
self._groups.clear()
for node in self._autoloads.values():
self._reregister_groups(node)
self._unique_nodes.clear()
for node in self._autoloads.values():
self._reregister_unique(node)
self._delete_queue.clear()
self._ui.reset()
self._current_camera_2d = None
self.set_root(new_root)
[docs]
def tick(self, dt: float):
"""Run process callbacks and coroutines on all nodes for one frame.
Order: autoloads' ``_process`` first (Godot-style global singletons),
then ``self.events.flush_deferred()`` so deferred events queued during
the previous frame (process, physics, or input) reach all subscribers
before scene logic runs, then the scene root's ``_process``. Anything
``emit_deferred``'d during this frame's autoload pass is also drained
in the same flush, keeping the scene's view of the world consistent.
"""
with self.activate_input():
from .assets import AssetServer
from .ui.core import Control
Control._current_frame += 1
self._tick_count += 1
if not self.paused:
self._now += dt
# Asset-loader completions arrive on worker threads; drain them
# onto the main thread before any node code runs so handlers see
# a stable world.
if AssetServer._instance is not None:
AssetServer._instance.flush()
for node in self._autoloads.values():
node._process_recursive(dt, self.paused)
self._events.flush_deferred()
if self.root:
self.root._process_recursive(dt, self.paused)
self._flush_deletes()
self._flush_coalesced_hooks()
# Audio backend reconciliation. Native MiniaudioBackend uses this
# to detect AudioBus volume / effect-chain changes (Property
# mutations on AudioBus aren't on_change-instrumented: diffing
# snapshot is the canonical sync path). Legacy + Null backends
# are no-ops; web reads bus state during its drain. One-line
# call so live UI changes (sliders, toggles) take effect next
# frame on every backend.
backend = getattr(self, "_audio_backend", None)
if backend is not None:
from .audio_bus import AudioBusLayout
from .audio_errors import AudioError, raise_or_warn
try:
backend.sync_bus_layout(AudioBusLayout.get_default())
except AudioError as exc:
# Per-frame call: let the audio system's strict/warn-once
# policy decide whether to raise or log. Other exceptions
# propagate (genuine engine bugs shouldn't be hidden by
# the per-frame catch-all).
raise_or_warn(
exc,
key="audio.scene_tree.sync_bus_layout_failed",
message="audio backend sync_bus_layout failed",
)
def _flush_coalesced_hooks(self) -> None:
"""Drain pending ``on_change`` callbacks queued by ``coalesce=True`` Properties.
Iterates a snapshot so hooks that themselves write to ``coalesce`` properties
enqueue onto the next frame (the natural debounce). Missing methods are
ignored: the owning node may have been removed mid-frame.
"""
if not self._pending_coalesced_hooks:
return
pending = self._pending_coalesced_hooks
self._pending_coalesced_hooks = set()
for obj, method_name in pending:
method = getattr(obj, method_name, None)
if method is not None:
method()
[docs]
def physics_tick(self, dt: float):
"""Run physics_process callbacks on all nodes, then auto-step physics."""
with self.activate_input():
for node in self._autoloads.values():
node._physics_process_recursive(dt, self.paused)
if self.root:
self.root._physics_process_recursive(dt, self.paused)
if self.auto_physics:
from .physics import PhysicsServer
server = PhysicsServer.get()
if server and server.body_count > 0:
server.step(dt)
# -- Input dispatch table maintenance ---------------------------------
def _register_input_node(self, node: Node) -> None:
"""Insert *node*'s ``@on_input`` handlers into the dispatch tables."""
for method_name, filt in type(node)._simvx_input_handlers:
kind = filt["kind"]
target = filt["target"]
released = filt["released"]
mods = filt["mods"]
if kind == "action":
self._action_handlers.setdefault((target, released), []).append((node, method_name, mods))
elif kind == "key":
if len(target) == 1:
self._key_handlers.setdefault((target[0], released), []).append((node, method_name, mods))
else:
self._key_handlers_any[released].append((node, method_name, target, mods))
elif kind == "button":
self._button_handlers.setdefault((target, released), []).append((node, method_name, mods))
elif kind == "motion":
self._motion_handlers.append((node, method_name))
elif kind == "scroll":
self._scroll_handlers.append((node, method_name))
elif kind == "joy_button":
self._joy_button_handlers.setdefault((target, released), []).append((node, method_name))
elif kind == "joy_axis":
self._joy_axis_handlers.setdefault(target, []).append((node, method_name))
elif kind == "catch_all":
self._catch_all_handlers.append((node, method_name))
def _unregister_input_node(self, node: Node) -> None:
"""Remove *node*'s entries from the dispatch tables on tree-exit."""
def _drop(seq):
seq[:] = [t for t in seq if t[0] is not node]
for bucket in self._action_handlers.values():
_drop(bucket)
for bucket in self._key_handlers.values():
_drop(bucket)
for bucket in self._key_handlers_any.values():
_drop(bucket)
for bucket in self._button_handlers.values():
_drop(bucket)
_drop(self._motion_handlers)
_drop(self._scroll_handlers)
for bucket in self._joy_button_handlers.values():
_drop(bucket)
for bucket in self._joy_axis_handlers.values():
_drop(bucket)
_drop(self._catch_all_handlers)
def _actions_for_event(self, event: TreeInputEvent) -> list[str]:
"""Return action names whose bindings match *event*. Empty list if none."""
if event.type == "key":
target_key = event.key
target_button = None
target_joy_button = None
elif event.type == "mouse_button":
target_key = None
target_button = event.mouse_button
target_joy_button = None
elif event.type == "joy_button":
target_key = None
target_button = None
target_joy_button = event.joy_button
else:
return []
if target_key is None and target_button is None and target_joy_button is None:
return []
matched: list[str] = []
for action_name, bindings in self.input_map._actions.items():
for b in bindings:
if target_key is not None and b.key == target_key:
matched.append(action_name)
break
if target_button is not None and b.mouse_button == target_button:
matched.append(action_name)
break
if target_joy_button is not None and b.joy_button == target_joy_button:
matched.append(action_name)
break
return matched
def _collect_unhandled_handlers(self) -> list[tuple[Node, str]]:
"""Walk the tree to gather ``on_unhandled_input`` overrides + decorated handlers.
Computed lazily; tree mutations bump ``_structure_version`` so we
rebuild only when the topology changes.
"""
cached = getattr(self, "_unhandled_cache", None)
cached_version = getattr(self, "_unhandled_cache_version", -1)
if cached is not None and cached_version == self._structure_version:
return cached
result: list[tuple[Node, str]] = []
if self.root is not None:
for node in self.root.walk(include_self=True):
methods = type(node)._simvx_hooks.get("unhandled_input", ())
for m in methods:
result.append((node, m))
for autoload in self._autoloads.values():
for node in autoload.walk(include_self=True):
methods = type(node)._simvx_hooks.get("unhandled_input", ())
for m in methods:
result.append((node, m))
self._unhandled_cache = result
self._unhandled_cache_version = self._structure_version
return result
def _safe_invoke(self, node: Node, method_name: str, *args: Any) -> Any:
"""Call ``node.method_name(*args)`` with the same error containment Node uses."""
if node._script_error:
return None
try:
return getattr(node, method_name)(*args)
except AssertionError:
raise
except Exception:
if Node.strict_errors:
raise
node._script_error = True
import sys
import traceback
tb = traceback.format_exc()
print(f"Script error in {node.name}.{method_name}: node disabled:\n{tb}", file=sys.stderr)
log.error("Script error in %s.%s: node disabled", node.name, method_name)
try:
Node.script_error_raised.emit(node, method_name, tb)
except Exception:
# A handler of the global script-error signal itself raised.
# Log and continue: error-recovery code must not derail.
log.debug("script_error_raised handler failed", exc_info=True)
return None
[docs]
def render(self, renderer):
cam = self._current_camera_2d
_has = hasattr(renderer, 'push_transform')
if _has and cam is not None:
z = cam.zoom if cam.zoom > 0 else 1.0
sw, sh = self._screen_size
sx, sy = cam._shake_offset
renderer.push_transform(
z, 0.0, 0.0, z,
(-float(cam.current[0]) + float(sx)) * z + sw * 0.5,
(-float(cam.current[1]) + float(sy)) * z + sh * 0.5,
)
if self.root:
self.root._draw_recursive(renderer)
if _has and cam is not None:
renderer.pop_transform()
# Modal Controls render through the standard child-draw walk; the
# ``top_level`` flag plus a second screen-space pass keeps them above
# sibling Controls regardless of tree position. Submission order is
# the GPU order, so modals drawn after the main tree paint on top.
if self._ui._modal_stack:
if hasattr(renderer, 'reset_clip'):
renderer.reset_clip()
for modal in self._ui._modal_stack:
if getattr(modal, "top_level", False) and modal.visible:
modal._draw_recursive(renderer)
[docs]
def push_modal(self, control):
"""Register a control as an active modal (drawn on top, receives all UI input)."""
self._ui.push_modal(control)
self.modal_opened(control)
[docs]
def pop_modal(self, control):
"""Unregister a modal control."""
self._ui.pop_modal(control)
self.modal_closed(control)
[docs]
def get_group(self, name: str) -> list[Node]:
"""Get all nodes in a group."""
return list(self._groups.get(name, ()))
# --- Autoloads (singletons that persist across scene changes) ---
[docs]
@property
def autoloads(self) -> dict[str, Node]:
"""Read-only view of registered autoloads."""
return self._autoloads
[docs]
def add_autoload(self, name: str, node: Node):
"""Register ``node`` as a persistent singleton attached to the tree.
The node enters the tree and runs ``on_ready()`` immediately. Unlike a
regular child, it is not reachable via the scene root: retrieve it via
``tree.autoloads[name]``. Autoloads survive ``change_scene()``, making
them the canonical home for global state (score, settings, audio
manager). See :doc:`../patterns`.
The name ``"events"`` is reserved for the engine-provided
:class:`~simvx.core.event_bus.EventBus`; use ``tree.events`` instead.
"""
if name == RESERVED_AUTOLOAD_EVENTS:
raise ValueError(
"Autoload name 'events' is reserved for the engine-provided "
"EventBus. Access it via tree.events; pick a different name "
"for your autoload."
)
self._autoloads[name] = node
node._enter_tree(self)
node._ready_recursive()
[docs]
def remove_autoload(self, name: str):
"""Unregister and tear down an autoload. Calls ``_exit_tree`` on the node."""
node = self._autoloads.pop(name, None)
if node:
node._exit_tree()
# --- Unique nodes ---
[docs]
def get_unique(self, name: str) -> Node | None:
"""Get a unique node by name. Returns None if not found."""
return self._unique_nodes.get(name)
# --- Internal helpers ---
def _group_add(self, group: str, node: Node):
if group not in self._groups:
self._groups[group] = set()
self._groups[group].add(node)
def _group_remove(self, group: str, node: Node):
if group in self._groups:
self._groups[group].discard(node)
def _reregister_groups(self, node: Node):
"""Re-add a node (and descendants) to the group index."""
for group in node._groups:
self._group_add(group, node)
for child in node.children:
self._reregister_groups(child)
def _reregister_unique(self, node: Node):
"""Re-add a node (and descendants) to the unique-node index."""
if node.unique_name:
self._unique_nodes[node.name] = node
for child in node.children:
self._reregister_unique(child)
def _invalidate_draw_caches(self):
"""Invalidate draw caches of Controls whose rect depends on screen/parent size.
Only anchored controls (any of anchor_{left,top,right,bottom} != 0) have
their absolute rect affected by a screen-size change. Controls with the
default zero anchors have rects computed purely from their own
position/size, so their caches stay valid here: own-size changes are
handled by Property setters and own-position changes by
_invalidate_transform.
Uses an iterative stack to avoid Python recursion overhead on deep trees.
"""
if not self.root:
return
from .ui import Control
stack = [self.root]
while stack:
node = stack.pop()
if isinstance(node, Control) and node._draw_cache is not None:
if (node.anchor_left or node.anchor_top
or node.anchor_right or node.anchor_bottom):
node._draw_dirty = True
node._draw_cache = None
children = node.children
if children:
stack.extend(children)
def _queue_delete(self, node: Node):
self._delete_queue.append(node)
def _flush_deletes(self):
for node in self._delete_queue:
if node.parent:
node.parent.remove_child(node)
self._delete_queue.clear()
def _collect_render_queue(self):
"""Collect all renderable nodes into batches (backend use only).
Returns:
List of RenderBatch objects, sorted by material/blend mode.
Backends call this once per frame to get optimized draw list.
"""
from .render_queue import RenderQueue
queue = RenderQueue()
if self.root:
self._collect_meshes_recursive(self.root, queue)
return queue.sorted_batches
def _collect_meshes_recursive(self, node: Node, queue):
"""Traverse tree collecting MeshInstance3D nodes into batches."""
if isinstance(node, MeshInstance3D) and node.mesh is not None:
queue.add_instance(node.mesh, node.material, node.model_matrix, node.layer if hasattr(node, 'layer') else 0)
for child in node.children:
self._collect_meshes_recursive(child, queue)
# ========================================================================
# UI Input System (delegated to UIInputManager)
# ========================================================================
def _set_focused_control(self, control):
"""Set the focused control (forwarded to UIInputManager)."""
self._ui._set_focused_control(control)
def _update_mouse_over_states(self, mouse_pos):
"""Update mouse_over state for all controls (forwarded to UIInputManager)."""
self._ui._update_mouse_over_states(self.root, mouse_pos)
def _find_control_at_point(self, point):
"""Find topmost control at screen position (forwarded to UIInputManager)."""
return self._ui._find_control_at_point(self.root, point)