Source code for simvx.core.node

"""Node: Base node class with tree hierarchy, groups, and coroutine support."""

import ast
import functools
import inspect
import logging
import textwrap
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, ClassVar

from .decorators import collect_hooks
from .descriptors import Children, Coroutine, CoroutineHandle, Notification, ProcessMode, Property
from .signals import Signal
from .events import InputEvent, TreeInputEvent

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from .scene_tree import SceneTree

# Lazy-cached reference for circular import
_ui_Control: type | None = None

def _get_control() -> type:
    global _ui_Control
    if _ui_Control is None:
        from .ui import Control
        _ui_Control = Control
    return _ui_Control

def _init_calls_super(func) -> bool:
    """Check via AST whether *func* contains a super().__init__(...) call."""
    try:
        source = textwrap.dedent(inspect.getsource(func))
        tree = ast.parse(source)
    except (OSError, TypeError, IndentationError):
        return True  # Cannot inspect: assume user handles super
    for node in ast.walk(tree):
        if not isinstance(node, ast.Call):
            continue
        f = node.func
        # super().__init__(...) or super(Cls, self).__init__(...)
        if isinstance(f, ast.Attribute) and f.attr == "__init__" and isinstance(f.value, ast.Call):
            inner = f.value
            if isinstance(inner.func, ast.Name) and inner.func.id == "super":
                return True
    return False

[docs] class Node: """Base node with tree hierarchy, groups, and coroutine support. Attributes: name: Unique name within the parent's children. Defaults to the class name. parent: The parent ``Node``, or ``None`` if this is the root. children: Ordered collection of child nodes, accessible by name or index. visible: Whether this node (and its descendants) should be drawn. process_mode: Controls processing behaviour during pause (``INHERIT``, ``PAUSABLE``, ``PAUSED_ONLY``, ``ALWAYS``, ``DISABLED``). script: Optional file path to an attached script. unique_name: When ``True``, the node is registered in the tree for fast lookup via ``SceneTree.get_unique_node()``. Example:: root = Node(name="Root") child = Node(name="Child") root.add_child(child) assert child.parent is root assert root.children["Child"] is child """ _registry: ClassVar[dict[str, type]] = {} strict_errors: ClassVar[bool] = True # Raise on script errors; set False for release script_error_raised = Signal() # emits (node, method_name, traceback_str) # Engine kwargs consumed by Node.__init__, not forwarded to user __init__ unless explicitly accepted _NODE_INIT_KWARGS = frozenset({"name"}) # Filled by __init_subclass__: hook name -> tuple of method names to invoke per dispatch. _simvx_hooks: ClassVar[dict[str, tuple[str, ...]]] = {} # Filled by __init_subclass__: ordered tuple of (method_name, filter_dict) for input handlers. _simvx_input_handlers: ClassVar[tuple[tuple[str, dict[str, Any]], ...]] = () _PRIMARY_HOOK_METHODS: ClassVar[tuple[str, ...]] = ( "on_ready", "on_process", "on_physics_process", "on_enter_tree", "on_exit_tree", "on_draw", "on_picked", "on_unhandled_input", ) # Bare names commonly mistaken for SimVX hooks (the engine only dispatches the on_-prefixed forms). _BARE_HOOK_NAMES: ClassVar[tuple[str, ...]] = ( "ready", "process", "physics_process", "draw", "input", )
[docs] def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) Node._registry[cls.__name__] = cls # Lint: catch bare hook names (e.g. `def ready(self)`) that the engine silently never invokes. for name in cls._BARE_HOOK_NAMES: if name in cls.__dict__ and callable(cls.__dict__[name]): raise TypeError( f"{cls.__name__}.{name}: not a SimVX hook: did you mean 'on_{name}'?" ) # Collect lifecycle and input handlers (decorated + same-named overrides) # walking the MRO most-derived-last, mirroring Property.__set_name__. cls._simvx_hooks, cls._simvx_input_handlers = collect_hooks(cls, cls._PRIMARY_HOOK_METHODS) # Auto-super: wrap user __init__ that doesn't call super().__init__ if "__init__" not in cls.__dict__: return # No custom __init__: nothing to wrap if cls.__dict__.get("__auto_init__") is False: return # Opted out user_init = cls.__dict__["__init__"] if _init_calls_super(user_init): return # User handles super(): don't wrap user_sig = inspect.signature(user_init) user_params = user_sig.parameters # Determine which params (beyond self) the user accepts user_param_names = [n for n in user_params if n != "self"] has_var_keyword = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in user_params.values()) has_var_positional = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in user_params.values()) # Positional param names (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD), in order positional_names = [ n for n, p in user_params.items() if n != "self" and p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD) ] # Find the parent __init__ to call (the next in MRO that is not the user's) parent_init = None for base in cls.__mro__[1:]: if "__init__" in base.__dict__: parent_init = base.__dict__["__init__"] break if parent_init is None: parent_init = Node.__init__ # Inspect parent __init__ signature to know what it accepts. # follow_wrapped=False gets the actual wrapper signature (not the original # user function's signature that functools.wraps copies). try: parent_sig = inspect.signature(parent_init, follow_wrapped=False) parent_param_names = {n for n in parent_sig.parameters if n != "self"} parent_has_var_kw = any( p.kind == inspect.Parameter.VAR_KEYWORD for p in parent_sig.parameters.values() ) except (ValueError, TypeError): parent_param_names = set() parent_has_var_kw = True # Assume flexible has_on_change = getattr(cls, '_has_on_change_hooks', False) @functools.wraps(user_init) def _wrapped_init(self, *args, **all_kwargs): # Map positional args to their parameter names so we can split by name user_args = args if args and not has_var_positional: for i, val in enumerate(args): if i < len(positional_names): pname = positional_names[i] if pname in all_kwargs: raise TypeError(f"__init__() got multiple values for argument '{pname}'") all_kwargs[pname] = val user_args = () # All positional mapped to kwargs # Split kwargs: parent_kw goes to parent __init__, user_fwd goes to user's __init__ props = cls.get_properties() parent_kw = {} user_fwd = {} for k, v in all_kwargs.items(): is_user_param = k in user_param_names is_parent_named = k in parent_param_names # Explicitly named in parent sig is_prop = k in props is_engine = k in Node._NODE_INIT_KWARGS if is_user_param: user_fwd[k] = v if is_parent_named or is_prop or is_engine: parent_kw[k] = v # Also pass to parent (e.g. 'name', properties) elif is_parent_named or is_prop or is_engine: parent_kw[k] = v # Known parent/engine/property kwarg elif has_var_keyword: user_fwd[k] = v # Unknown kwarg, user accepts **kwargs elif parent_has_var_kw: parent_kw[k] = v # Fallback to parent **kwargs (will warn) else: raise TypeError(f"{cls.__name__}.__init__() got unexpected keyword argument {k!r}") we_set_init_scope = has_on_change and not getattr(self, '_on_change_init', False) if we_set_init_scope: self._on_change_init = True self._on_change_pending = [] try: # Initialise via parent chain (e.g. Node2D.__init__ -> Node.__init__) parent_init(self, **parent_kw) # Forward to user's __init__ if has_var_keyword or has_var_positional: user_init(self, *user_args, **user_fwd) elif user_param_names: forward = {k: v for k, v in user_fwd.items() if k in user_param_names} user_init(self, *user_args, **forward) else: user_init(self) finally: if we_set_init_scope: self._on_change_init = False self._flush_on_change_pending() cls.__init__ = _wrapped_init
# Class-level marker that lets ``_draw_recursive`` cheaply detect CanvasLayer # children without importing the subclass (which lives in a sibling module # and would otherwise force a per-frame ``from ... import CanvasLayer``). # ``CanvasLayer`` overrides this to ``True``. _is_canvas_layer: bool = False def __init__(self, name: str = "", **kwargs): if name and not isinstance(name, str): raise TypeError(f"Node name must be a string, got {type(name).__name__}") self._name = name or type(self).__name__ self.parent: Node | None = None self.children = Children() # Count of direct children whose ``_is_canvas_layer`` flag is True. # Maintained by add_child / remove_child so ``_draw_recursive`` can # skip the sort/partition fast path when zero. self._canvas_layer_child_count: int = 0 self._tree: SceneTree | None = None self._coroutines: list[Coroutine] = [] self._groups: set[str] = set() self._scene_template_path: str | None = None self.script: str | None = None self._script_embedded: str | None = None # source stored in scene, importable self._script_module = None # ModuleType | None: cached loaded module self._script_original_class: type | None = None # original class before script swap self._visible: bool = True self._visible_in_hierarchy: bool = True self._process_mode: ProcessMode = ProcessMode.INHERIT self._cached_process_mode: ProcessMode | None = None # cached resolved mode self.unique_name: bool = False self._script_error: bool = False self._outgoing_connections: list = [] # signals connected via this node's bound methods # Open the on_change deferral scope if this class has any on_change # hooks AND no enclosing init wrapper has already opened one. The flag # set here is cleared in the matching `finally` after kwargs are applied. we_set_init_scope = ( getattr(type(self), '_has_on_change_hooks', False) and not getattr(self, '_on_change_init', False) ) if we_set_init_scope: self._on_change_init = True self._on_change_pending = [] try: # Apply Property values passed as kwargs props = self.get_properties() for key, val in kwargs.items(): if key in props: setattr(self, key, val) else: raise TypeError(f"{type(self).__name__}: unknown kwarg {key!r}") finally: if we_set_init_scope: self._on_change_init = False self._flush_on_change_pending() @property def name(self) -> str: return self._name
[docs] @name.setter def name(self, value: str): old = self._name self._name = value parent = getattr(self, 'parent', None) if parent is not None: names = parent.children._names if old and names.get(old) is self: del names[old] if value: names[value] = self
@property def process_mode(self) -> ProcessMode: return self._process_mode
[docs] @process_mode.setter def process_mode(self, value: ProcessMode): self._process_mode = value self._invalidate_process_mode_cache()
@property def visible(self) -> bool: return self._visible
[docs] @visible.setter def visible(self, value: bool): value = bool(value) if value == self._visible: return self._visible = value parent_effective = True if self.parent is None else self.parent._visible_in_hierarchy self._propagate_visibility(parent_effective) self._notification(Notification.VISIBILITY_CHANGED)
def _propagate_visibility(self, parent_effective: bool) -> None: """Update _visible_in_hierarchy for self and descendants. Propagates the effective visibility down the subtree; prunes branches whose effective state didn't change so toggles cost O(changed-subtree) rather than O(whole-subtree). """ new_effective = parent_effective and self._visible if self._visible_in_hierarchy == new_effective: return self._visible_in_hierarchy = new_effective for child in self.children: child._propagate_visibility(new_effective) def _invalidate_process_mode_cache(self): """Clear cached process mode for this node and descendants that inherit.""" self._cached_process_mode = None for child in self.children: if child._process_mode == ProcessMode.INHERIT: child._invalidate_process_mode_cache() def _notification(self, what: Notification) -> None: """Called when a notification is dispatched. Override to handle."""
[docs] def reset_error(self) -> None: """Clear script error flag to re-enable processing.""" self._script_error = False
def _flush_on_change_pending(self) -> None: """Dispatch on_change hooks queued during ``__init__``. Hooks are deduplicated by ``(property_attr, method_name)`` so multiple sets of the same property during construction fire the hook once. Order of first occurrence is preserved. """ pending = getattr(self, '_on_change_pending', None) if not pending: return self._on_change_pending = [] seen: set[tuple[str, str]] = set() for prop_attr, method_name in pending: key = (prop_attr, method_name) if key in seen: continue seen.add(key) method = getattr(self, method_name, None) if method is not None: method() def _safe_call(self, method, *args: Any) -> None: """Call a lifecycle method with error recovery.""" if self._script_error: return try: method(*args) except AssertionError: raise except Exception: if Node.strict_errors: raise self._script_error = True import sys import traceback tb = traceback.format_exc() # Always print to stderr so errors are never invisible print(f"Script error in {self.name}.{method.__name__}: node disabled:\n{tb}", file=sys.stderr) log.error("Script error in %s.%s: node disabled", self.name, method.__name__) try: Node.script_error_raised.emit(self, method.__name__, tb) except Exception: # justified: signal-handler errors must not derail error recovery itself pass
[docs] def add_child(self, node: Node) -> Node: """Add a node as a child, reparenting it if already in a tree. Args: node: The node to add. Removed from its current parent first. Raises: ValueError: ``node`` is ``self`` or one of ``self``'s ancestors -- either would create a cycle in the scene tree. """ if node is self: raise ValueError(f"Cannot add node {node.name!r} as child of itself") ancestor = self.parent while ancestor is not None: if ancestor is node: raise ValueError( f"Cannot reparent {node.name!r} under its descendant " f"{self.name!r}: would create a cycle" ) ancestor = ancestor.parent if node.parent: node.parent.remove_child(node) node.parent = self self.children._add(node) if node._is_canvas_layer: self._canvas_layer_child_count += 1 node._notification(Notification.PARENTED) node._invalidate_process_mode_cache() node._propagate_visibility(self._visible_in_hierarchy) if hasattr(node, '_invalidate_transform'): node._invalidate_transform() if self._tree: self._tree._structure_version += 1 node._enter_tree(self._tree) node._ready_recursive() return node
[docs] def remove_child(self, node: Node): """Remove a child node from this node's children.""" if node in self.children: if self._tree: self._tree._structure_version += 1 node._exit_tree() self.children._remove(node) if node._is_canvas_layer and self._canvas_layer_child_count > 0: self._canvas_layer_child_count -= 1 node._notification(Notification.UNPARENTED) node.parent = None node._invalidate_process_mode_cache() node._propagate_visibility(True)
[docs] def reparent(self, new_parent: Node): """Remove from current parent and add to new_parent.""" if self.parent: self.parent.remove_child(self) new_parent.add_child(self)
[docs] def get_node(self, path: str) -> Node: """Navigate tree by path: 'Child/GrandChild' or '../Sibling'.""" current = self parts = [p for p in path.split('/') if p] if path.startswith('/'): while current.parent: current = current.parent # An absolute path may optionally name the root as its first segment # (e.g. '/Root/Player'). Consume it once so it resolves to root. if parts and parts[0] == current.name: parts.pop(0) for part in parts: if part == '..': current = current.parent if current is None: raise ValueError("Already at root") else: current = current.children[part] return current
[docs] def find_child(self, name: str, recursive: bool = False) -> Node | None: """Find first child with the given name.""" for c in self.children: if c.name == name: return c if recursive: found = c.find_child(name, recursive=True) if found: return found return None
[docs] def find(self, target: type | str, recursive: bool = True) -> Node | None: """Find first descendant matching ``target``. Dispatches on the argument type: - ``find(NodeClass)`` returns the first descendant ``isinstance`` of ``NodeClass`` (recursive by default). - ``find("name")`` returns the first descendant whose ``name`` attribute equals the string (recursive by default). Recursion is depth-first, pre-order. Returns ``None`` when no match. """ if isinstance(target, str): for child in self.children: if child.name == target: return child if recursive: found = child.find(target, recursive=True) if found: return found return None for child in self.children: if isinstance(child, target): return child if recursive: found = child.find(target, recursive=True) if found: return found return None
[docs] def find_all(self, node_type: type, recursive: bool = True) -> list: """Find all descendants of type.""" result = [] for child in self.children: if isinstance(child, node_type): result.append(child) if recursive: result.extend(child.find_all(node_type)) return result
[docs] def walk(self, *, include_self: bool = True) -> Iterator[Node]: """Iterate this node and all descendants in DFS pre-order.""" if include_self: yield self for child in self.children: yield from child.walk(include_self=True)
[docs] @property def path(self) -> str: if self.parent is None: return f"/{self.name}" return f"{self.parent.path}/{self.name}"
# --- Groups ---
[docs] def add_to_group(self, group: str): """Add this node to a named group.""" self._groups.add(group) if self._tree: self._tree._group_add(group, self)
[docs] def remove_from_group(self, group: str): """Remove this node from a named group.""" self._groups.discard(group) if self._tree: self._tree._group_remove(group, self)
[docs] def is_in_group(self, group: str) -> bool: """Check if this node belongs to a named group.""" return group in self._groups
# --- Lifecycle (override in subclasses) ---
[docs] def on_ready(self) -> None: """Called once after the node and all its children enter the scene tree. Override to perform initialisation that requires the scene tree -- finding sibling nodes, connecting signals, spawning children. The ``tree`` property is available. Called after ``on_enter_tree()`` and after all children's ``on_ready()``. Decorate other methods with ``@on_ready`` to register additional ready handlers; they fire after the override in declaration order. Note: Fires again if the node is removed and re-added to the tree. Example:: def on_ready(self): self.sprite = self.get_node("Sprite") self.health_changed.connect(self._update_hud) """
[docs] def on_enter_tree(self) -> None: """Called when the node enters the scene tree, before ``on_ready()``. Override for setup that must happen the moment the tree reference becomes available. Children have not entered yet at this point, so avoid querying child nodes here -- use ``on_ready()`` instead. Example:: def on_enter_tree(self): self.add_to_group("enemies") """
[docs] def on_exit_tree(self) -> None: """Called when the node is about to leave the scene tree. Override to clean up resources, disconnect external signals, or persist state. Children have already exited by the time this fires on the parent. Example:: def on_exit_tree(self): self.save_progress() self.remove_from_group("enemies") """
[docs] def on_process(self, dt: float) -> None: """Called every frame for game logic. Args: dt: Seconds elapsed since the previous frame (variable timestep). Override for movement, AI, animation triggers, or any per-frame update. Obeys ``process_mode`` -- disabled or paused nodes are skipped automatically. Decorate other methods with ``@on_process`` to register additional per-frame handlers; they fire after the override in declaration order. For state held while a button is pressed, poll ``Input.is_action_pressed("name")`` from inside ``on_process``. Example:: def on_process(self, dt): self.position += self.velocity * dt """
[docs] def on_physics_process(self, dt: float) -> None: """Called at a fixed timestep (default 60 Hz) for physics logic. Args: dt: Fixed time step in seconds (e.g. 1/60). Override for deterministic physics updates -- forces, collision responses, rigid-body integration. Runs independently of the render frame rate. Example:: def on_physics_process(self, dt): self.velocity += self.gravity * dt self.move_and_slide() """
[docs] def on_draw(self, renderer) -> None: """Called each frame for custom 2D drawing. Args: renderer: The active draw-command recorder (e.g. ``Draw2D``). Override to issue immediate-mode draw calls such as ``draw_line``, ``draw_rect``, or ``draw_text``. Called only when ``visible`` is ``True``. Example:: def on_draw(self, renderer): renderer.draw_circle(self.world_position, 10, colour=(1, 0, 0, 1)) """
[docs] def on_picked(self, event: InputEvent) -> None: """Called when a 3D mouse-pick event hits this node's collision shape. Args: event: The input event containing click position, camera ray, etc. Override to react to direct interaction with this 3D object -- selection, dragging, context menus. Example:: def on_picked(self, event): if event.button == MouseButton.LEFT: self.selected = True """
[docs] def on_unhandled_input(self, event: TreeInputEvent) -> None: """Called for input events that no ``@on_input`` handler consumed. Args: event: The unhandled input event. Use for catch-all bindings such as global debug toggles or pause menus that should only fire when no other handler returned a truthy value to consume the event. For most input handling use ``@on_input(...)`` decorators with explicit filters; the dispatch tables route them directly without walking the tree. Example:: def on_unhandled_input(self, event): if event.key == Key.F3: self.toggle_debug_overlay() """
# --- Coroutine support ---
[docs] def start_coroutine(self, gen: Coroutine) -> CoroutineHandle: """Register a generator coroutine to run each frame. Returns a cancellable handle.""" handle = CoroutineHandle(gen) self._coroutines.append(handle) return handle
[docs] def stop_coroutine(self, gen_or_handle): """Stop and remove a running coroutine (accepts generator or CoroutineHandle).""" if isinstance(gen_or_handle, CoroutineHandle): gen_or_handle.cancel() if gen_or_handle in self._coroutines: self._coroutines.remove(gen_or_handle) return for h in self._coroutines: if h._gen is gen_or_handle: h.cancel() self._coroutines.remove(h) return
def _tick_coroutines(self, dt: float): if not self._coroutines: return finished = [] for handle in self._coroutines: if handle.is_cancelled: finished.append(handle) continue gen = handle._gen try: if handle._primed: gen.send(dt) else: next(gen) handle._primed = True except StopIteration: finished.append(handle) for handle in finished: # stop_coroutine() invoked from inside the coroutine may have already removed it. if handle in self._coroutines: self._coroutines.remove(handle) # --- Tree internals --- def _enter_tree(self, tree: SceneTree): self._tree = tree # Instantiate declared Child descriptors declared = getattr(type(self), '_declared_children', None) if declared: for attr_name, child_desc in declared.items(): if self.__dict__.get(attr_name) is None: kwargs = dict(child_desc._kwargs) if 'name' not in kwargs: kwargs['name'] = attr_name node = child_desc._type(*child_desc._args, **kwargs) self.__dict__[attr_name] = node self.children._add(node) node.parent = self if self.unique_name: tree._unique_nodes[self.name] = self for group in self._groups: tree._group_add(group, self) self._notification(Notification.ENTER_TREE) for method_name in type(self)._simvx_hooks.get("enter_tree", ()): self._safe_call(getattr(self, method_name)) # Register @on_input handlers with the tree's dispatch tables. if type(self)._simvx_input_handlers: tree._register_input_node(self) for child in self.children: child._enter_tree(tree) def _exit_tree(self): for child in self.children: child._exit_tree() self._notification(Notification.EXIT_TREE) for method_name in type(self)._simvx_hooks.get("exit_tree", ()): self._safe_call(getattr(self, method_name)) # Close any in-flight coroutines so their ``finally:`` blocks run # (releases signal-handler subscriptions, restores transforms, etc.). # Without this, ``wait_signal`` lambdas stay attached to the signal # for the lifetime of the emitter: a slow leak on scene churn. if self._coroutines: for handle in self._coroutines: try: handle._gen.close() except Exception: log.exception("Coroutine close raised on node exit for %r", self) self._coroutines.clear() if self._tree: if type(self)._simvx_input_handlers: self._tree._unregister_input_node(self) if self.unique_name: self._tree._unique_nodes.pop(self.name, None) for group in self._groups: self._tree._group_remove(group, self) self._tree = None def _ready_recursive(self): for child in self.children: child._ready_recursive() self._notification(Notification.READY) for method_name in type(self)._simvx_hooks.get("ready", ()): self._safe_call(getattr(self, method_name)) def _effective_process_mode(self) -> ProcessMode: """Resolve INHERIT by walking up the tree (cached).""" cached = self._cached_process_mode if cached is not None: return cached mode = self._process_mode if mode == ProcessMode.INHERIT: mode = self.parent._effective_process_mode() if self.parent else ProcessMode.PAUSABLE self._cached_process_mode = mode return mode def _can_process(self, paused: bool) -> bool: """Check if this node should process given the tree's pause state.""" mode = self._effective_process_mode() if mode == ProcessMode.DISABLED: return False if mode == ProcessMode.ALWAYS: return True if mode == ProcessMode.PAUSED_ONLY: return paused # PAUSABLE (or resolved INHERIT → PAUSABLE) return not paused def _process_recursive(self, dt: float, paused: bool = False): if self._script_error: return # Inlined _can_process: resolve mode from cache and check pause state mode = self._cached_process_mode if mode is None: mode = self._effective_process_mode() if mode != ProcessMode.DISABLED and ( mode == ProcessMode.ALWAYS or (not paused if mode == ProcessMode.PAUSABLE else paused) ): self._notification(Notification.PROCESS) handlers = type(self)._simvx_hooks.get("process", ()) for method_name in handlers: try: getattr(self, method_name)(dt) except AssertionError: raise except Exception: if Node.strict_errors: raise self._script_error = True import sys import traceback tb = traceback.format_exc() print(f"Script error in {self.name}.{method_name}: node disabled:\n{tb}", file=sys.stderr) log.error("Script error in %s.%s: node disabled", self.name, method_name) try: Node.script_error_raised.emit(self, method_name, tb) except Exception: # justified: signal-handler error during process; node already disabled, recursion continues pass return if self._coroutines: self._tick_coroutines(dt) for child in self.children.safe_iter(): child._process_recursive(dt, paused) def _physics_process_recursive(self, dt: float, paused: bool = False): if self._script_error: return # Inlined _can_process: resolve mode from cache and check pause state mode = self._cached_process_mode if mode is None: mode = self._effective_process_mode() if mode != ProcessMode.DISABLED and ( mode == ProcessMode.ALWAYS or (not paused if mode == ProcessMode.PAUSABLE else paused) ): self._notification(Notification.PHYSICS_PROCESS) handlers = type(self)._simvx_hooks.get("physics_process", ()) for method_name in handlers: try: getattr(self, method_name)(dt) except AssertionError: raise except Exception: if Node.strict_errors: raise self._script_error = True import sys import traceback tb = traceback.format_exc() print(f"Script error in {self.name}.{method_name}: node disabled:\n{tb}", file=sys.stderr) log.error("Script error in %s.%s: node disabled", self.name, method_name) try: Node.script_error_raised.emit(self, method_name, tb) except Exception: # justified: signal-handler error during physics_process; node already disabled, recursion continues pass return for child in self.children.safe_iter(): child._physics_process_recursive(dt, paused) def _draw_recursive(self, renderer): if not self.visible: return if self._script_error: for child in self.children.safe_iter(): child._draw_recursive(renderer) return self._draw_dispatch(renderer) # Fast path: ``_canvas_layer_child_count`` is maintained by add_child / # remove_child, so the zero-CanvasLayer case skips the sort/partition # entirely (and avoids the per-frame ``from .nodes_2d.canvas import # CanvasLayer`` lookup the old check forced). if self._canvas_layer_child_count == 0: for child in self.children.safe_iter(): child._draw_recursive(renderer) return # Sort CanvasLayer children by their ``layer`` rank (negative below # self.on_draw handlers, positive after non-layer children) so HUD # layers render above world content regardless of tree-add order. # This matches Node2D._draw_recursive's z-ordering scheme. non_layers = [] canvas_layers = [] for c in self.children.safe_iter(): (canvas_layers if c._is_canvas_layer else non_layers).append(c) canvas_layers.sort(key=lambda c: c.layer) # Negative-layer CanvasLayers draw before non-layer siblings; zero/positive after. for layer in canvas_layers: if layer.layer < 0: layer._draw_recursive(renderer) for child in non_layers: child._draw_recursive(renderer) for layer in canvas_layers: if layer.layer >= 0: layer._draw_recursive(renderer) def _draw_dispatch(self, renderer): """Invoke all ``on_draw`` handlers (override + decorated) for this node. Used by ``_draw_recursive`` and by Control/Node2D subclasses that wrap drawing with caching, transforms, or clipping but still want to fire the same ordered set of handlers. """ for method_name in type(self)._simvx_hooks.get("draw", ()): self._safe_call(getattr(self, method_name), renderer)
[docs] def clear_children(self): """Destroy all children of this node.""" for child in list(self.children): child.destroy()
[docs] def destroy(self): """Schedule this node for removal at the end of the current frame. Signal connections made through this node's bound methods are proactively disconnected so emitters stop dispatching to it on the next emit (Godot 4 behaviour). Lazy weak-ref cleanup in ``Signal.__call__`` covers nodes that are GC'd without ``destroy()``. """ for conn in list(self._outgoing_connections): conn.disconnect() self._outgoing_connections.clear() if self._tree: self._tree._queue_delete(self)
[docs] @property def app(self): """The App running this node's scene tree. Available after enter_tree().""" return self._tree.app if self._tree else None
[docs] @property def tree(self) -> SceneTree: """The SceneTree this node belongs to.""" return self._tree
[docs] def __getitem__(self, key: str): """Shorthand for get_node: ``self["Child/Path"]``.""" return self.get_node(key)
[docs] @classmethod def get_properties(cls) -> dict[str, Property]: """Return all Property descriptors declared on this node class and its bases.""" return getattr(cls, '__properties__', {})
[docs] def __repr__(self): return f"<{type(self).__name__} '{self.name}'>"
# Register Node itself (not covered by __init_subclass__ which only fires for subclasses) Node._registry["Node"] = Node # Collect primary lifecycle hooks defined directly on Node (no decorators on the # base class) so bare Node instances dispatch through the same code path as # subclasses. Node._simvx_hooks, Node._simvx_input_handlers = collect_hooks(Node, Node._PRIMARY_HOOK_METHODS) # ============================================================================ # Timer # ============================================================================
[docs] class Timer(Node): """Fires timeout signal after duration. Supports one-shot and repeating.""" duration = Property(1.0, range=(0.001, 3600)) one_shot = Property(True) autostart = Property(False) def __init__(self, duration: float = 1.0, one_shot: bool = True, autostart: bool = False, **kwargs): super().__init__(**kwargs) self.duration = duration self.one_shot = one_shot self.timeout = Signal() self._time_left = duration if autostart else 0.0 self._running = autostart
[docs] def start(self, duration: float = 0): """Start or restart the timer, optionally overriding duration.""" if duration > 0: self.duration = duration self._time_left = self.duration self._running = True
[docs] def stop(self): """Stop the timer and reset time_left to zero.""" self._running = False self._time_left = 0.0
[docs] @property def stopped(self) -> bool: return not self._running
[docs] @property def time_left(self) -> float: return self._time_left
[docs] def on_process(self, dt: float): if not self._running: return self._time_left -= dt if self._time_left <= 0: self.timeout() if self.one_shot: self._running = False else: self._time_left += self.duration