"""SceneTree — Central manager for the node tree, groups, input routing, and UI focus."""
from __future__ import annotations
import logging
from typing import Any
import numpy as np
from .events import InputEvent
from .helpers import ray_intersect_sphere, screen_to_ray
from .node import Node
from .nodes_2d.camera import Camera2D
from .nodes_3d.camera import Camera3D
from .nodes_3d.mesh import MeshInstance3D
from .physics_nodes import CollisionShape3D
from .ui_input import UIInputManager
log = logging.getLogger(__name__)
[docs]
class SceneTree:
"""Central manager for the node tree, groups, input routing, and UI focus.
Owns the root node and drives per-frame ``process`` / ``physics_process`` /
``draw`` traversals. Also manages pause state, scene changes, and the UI
input pipeline (mouse, keyboard, popups).
"""
@property
def app(self):
"""The App instance running this tree (set by graphics backend)."""
return getattr(self, "_app", None)
def __init__(self, screen_size=None):
self.root: Node | None = None
self._screen_size: tuple[float, float] = SceneTree._normalize_size(screen_size or (800, 600))
self.paused: bool = False
self._delete_queue: list[Node] = []
self._groups: dict[str, set[Node]] = {}
self._autoloads: dict[str, Node] = {}
self._unique_nodes: dict[str, Node] = {}
self._ui = UIInputManager()
self._current_camera_2d: Camera2D | None = None
self.auto_physics: bool = True # Automatically step PhysicsServer each physics tick
self.overlay_offset: tuple[float, float] = (0.0, 0.0) # 2D offset for Text2D/particle overlays
self.play_viewport_rect: tuple[float, float, float, float] | None = None # (x, y, w, h) to constrain 3D rendering
self._structure_version: int = 0 # Incremented on add_child/remove_child for cache invalidation
# -- UI state forwarding (preserves external interface) --
@property
def _focused_control(self):
return self._ui._focused_control
@_focused_control.setter
def _focused_control(self, v):
self._ui._focused_control = v
@property
def _mouse_grab(self) -> Any:
return self._ui._mouse_grab
@_mouse_grab.setter
def _mouse_grab(self, v):
self._ui._mouse_grab = v
@property
def _last_mouse_pos(self):
return self._ui._last_mouse_pos
@_last_mouse_pos.setter
def _last_mouse_pos(self, v):
self._ui._last_mouse_pos = v
@property
def _popup_stack(self) -> list:
return self._ui._popup_stack
@_popup_stack.setter
def _popup_stack(self, v):
self._ui._popup_stack = v
@property
def _shortcut_handler(self):
return self._ui._shortcut_handler
@_shortcut_handler.setter
def _shortcut_handler(self, v):
self._ui._shortcut_handler = v
@staticmethod
def _normalize_size(sz) -> tuple[float, float]:
"""Coerce any screen_size representation to a plain tuple once."""
if isinstance(sz, tuple) and len(sz) == 2:
return sz
if hasattr(sz, 'x'):
return (float(sz.x), float(sz.y))
return (float(sz[0]), float(sz[1]))
@property
def screen_size(self) -> tuple[float, float]:
return self._screen_size
@screen_size.setter
def screen_size(self, value):
old = self._screen_size
self._screen_size = SceneTree._normalize_size(value)
if self._screen_size != old:
self._invalidate_draw_caches()
[docs]
def set_root(self, root: Node):
"""Set the root node of the scene tree."""
log.debug("SceneTree.set_root(%s)", root)
self.root = root
root._enter_tree(self)
root._ready_recursive()
[docs]
def change_scene(self, new_root: Node):
"""Replace the current scene root with a new one. Autoloads persist."""
log.debug("SceneTree.change_scene(%s), old root=%s", new_root, self.root)
if self.root:
self.root._exit_tree()
# Rebuild groups from autoloads only (scene groups were cleared by _exit_tree)
self._groups.clear()
for node in self._autoloads.values():
self._reregister_groups(node)
self._unique_nodes.clear()
for node in self._autoloads.values():
self._reregister_unique(node)
self._delete_queue.clear()
self._ui.reset()
self._current_camera_2d = None
self.set_root(new_root)
[docs]
def process(self, dt: float):
"""Run process callbacks and coroutines on all nodes for one frame."""
from .ui.core import Control
Control._current_frame += 1
for node in self._autoloads.values():
node._process_recursive(dt, self.paused)
if self.root:
self.root._process_recursive(dt, self.paused)
self._flush_deletes()
[docs]
def physics_process(self, dt: float):
"""Run physics_process callbacks on all nodes, then auto-step physics."""
for node in self._autoloads.values():
node._physics_process_recursive(dt, self.paused)
if self.root:
self.root._physics_process_recursive(dt, self.paused)
if self.auto_physics:
from .physics.engine import PhysicsServer
server = PhysicsServer.get()
if server and server.body_count > 0:
server.step(dt)
[docs]
def draw(self, renderer):
cam = self._current_camera_2d
_has = hasattr(renderer, 'push_transform')
if _has and cam is not None:
z = cam.zoom if cam.zoom > 0 else 1.0
sw, sh = self._screen_size
renderer.push_transform(
z, 0.0, 0.0, z,
-float(cam._current[0]) * z + sw * 0.5,
-float(cam._current[1]) * z + sh * 0.5,
)
if self.root:
self.root._draw_recursive(renderer)
if _has and cam is not None:
renderer.pop_transform()
# Draw popups last (on top of everything, in screen space)
if self._ui._popup_stack:
# Force a new rendering layer so popup fills draw over prior lines
if hasattr(renderer, 'new_layer'):
renderer.new_layer()
# Reset clip to full screen so popups aren't clipped by parent containers
if hasattr(renderer, 'reset_clip'):
renderer.reset_clip()
for popup in self._ui._popup_stack:
popup.draw_popup(renderer)
[docs]
def get_group(self, name: str) -> list[Node]:
"""Get all nodes in a group."""
return list(self._groups.get(name, ()))
# --- Autoloads (singletons that persist across scene changes) ---
@property
def autoloads(self) -> dict[str, Node]:
"""Read-only view of registered autoloads."""
return self._autoloads
[docs]
def add_autoload(self, name: str, node: Node):
"""Register a node as an autoload singleton. Autoloads persist across change_scene()."""
self._autoloads[name] = node
node._enter_tree(self)
node._ready_recursive()
[docs]
def remove_autoload(self, name: str):
"""Remove and detach an autoload singleton by name."""
node = self._autoloads.pop(name, None)
if node:
node._exit_tree()
# --- Unique nodes ---
[docs]
def get_unique(self, name: str) -> Node | None:
"""Get a unique node by name. Returns None if not found."""
return self._unique_nodes.get(name)
# --- Internal helpers ---
def _group_add(self, group: str, node: Node):
if group not in self._groups:
self._groups[group] = set()
self._groups[group].add(node)
def _group_remove(self, group: str, node: Node):
if group in self._groups:
self._groups[group].discard(node)
def _reregister_groups(self, node: Node):
"""Re-add a node (and descendants) to the group index."""
for group in node._groups:
self._group_add(group, node)
for child in node.children:
self._reregister_groups(child)
def _reregister_unique(self, node: Node):
"""Re-add a node (and descendants) to the unique-node index."""
if node.unique_name:
self._unique_nodes[node.name] = node
for child in node.children:
self._reregister_unique(child)
def _invalidate_draw_caches(self):
"""Invalidate all Control draw caches (e.g. on resize).
Uses an iterative stack to avoid Python recursion overhead on deep trees.
"""
if not self.root:
return
from .ui import Control
stack = [self.root]
while stack:
node = stack.pop()
if isinstance(node, Control):
node._draw_dirty = True
node._draw_cache = None
children = node.children
if children:
stack.extend(children)
def _queue_delete(self, node: Node):
self._delete_queue.append(node)
def _flush_deletes(self):
for node in self._delete_queue:
if node.parent:
node.parent.remove_child(node)
self._delete_queue.clear()
def _collect_render_queue(self):
"""Collect all renderable nodes into batches (backend use only).
Returns:
List of RenderBatch objects, sorted by material/blend mode.
Backends call this once per frame to get optimized draw list.
"""
from .render_queue import RenderQueue
queue = RenderQueue()
if self.root:
self._collect_meshes_recursive(self.root, queue)
return queue.get_batches()
def _collect_meshes_recursive(self, node: Node, queue):
"""Traverse tree collecting MeshInstance3D nodes into batches."""
if isinstance(node, MeshInstance3D) and node.mesh is not None:
queue.add_instance(node.mesh, node.material, node.model_matrix, node.layer if hasattr(node, 'layer') else 0)
for child in node.children:
self._collect_meshes_recursive(child, queue)
# ========================================================================
# UI Input System (delegated to UIInputManager)
# ========================================================================
def _set_focused_control(self, control):
"""Set the focused control (forwarded to UIInputManager)."""
self._ui._set_focused_control(control)
def _update_mouse_over_states(self, mouse_pos):
"""Update mouse_over state for all controls (forwarded to UIInputManager)."""
self._ui._update_mouse_over_states(self.root, mouse_pos)
def _find_control_at_point(self, point):
"""Find topmost control at screen position (forwarded to UIInputManager)."""
return self._ui._find_control_at_point(self.root, point)