"""Node: Base node class with tree hierarchy, groups, and coroutine support."""
import ast
import functools
import inspect
import logging
import textwrap
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, ClassVar
from .decorators import collect_hooks
from .descriptors import Children, Coroutine, CoroutineHandle, Notification, ProcessMode, Property
from .signals import Signal
from .events import InputEvent, TreeInputEvent
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from .scene_tree import SceneTree
# Lazy-cached reference for circular import
_ui_Control: type | None = None
def _get_control() -> type:
global _ui_Control
if _ui_Control is None:
from .ui import Control
_ui_Control = Control
return _ui_Control
def _init_calls_super(func) -> bool:
"""Check via AST whether *func* contains a super().__init__(...) call."""
try:
source = textwrap.dedent(inspect.getsource(func))
tree = ast.parse(source)
except (OSError, TypeError, IndentationError):
return True # Cannot inspect: assume user handles super
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
f = node.func
# super().__init__(...) or super(Cls, self).__init__(...)
if isinstance(f, ast.Attribute) and f.attr == "__init__" and isinstance(f.value, ast.Call):
inner = f.value
if isinstance(inner.func, ast.Name) and inner.func.id == "super":
return True
return False
[docs]
class Node:
"""Base node with tree hierarchy, groups, and coroutine support.
Attributes:
name: Unique name within the parent's children. Defaults to the class name.
parent: The parent ``Node``, or ``None`` if this is the root.
children: Ordered collection of child nodes, accessible by name or index.
visible: Whether this node (and its descendants) should be drawn.
process_mode: Controls processing behaviour during pause
(``INHERIT``, ``PAUSABLE``, ``PAUSED_ONLY``, ``ALWAYS``, ``DISABLED``).
script: Optional file path to an attached script.
unique_name: When ``True``, the node is registered in the tree for
fast lookup via ``SceneTree.get_unique_node()``.
Example::
root = Node(name="Root")
child = Node(name="Child")
root.add_child(child)
assert child.parent is root
assert root.children["Child"] is child
"""
_registry: ClassVar[dict[str, type]] = {}
strict_errors: ClassVar[bool] = True # Raise on script errors; set False for release
script_error_raised = Signal() # emits (node, method_name, traceback_str)
# Engine kwargs consumed by Node.__init__, not forwarded to user __init__ unless explicitly accepted
_NODE_INIT_KWARGS = frozenset({"name"})
# Filled by __init_subclass__: hook name -> tuple of method names to invoke per dispatch.
_simvx_hooks: ClassVar[dict[str, tuple[str, ...]]] = {}
# Filled by __init_subclass__: ordered tuple of (method_name, filter_dict) for input handlers.
_simvx_input_handlers: ClassVar[tuple[tuple[str, dict[str, Any]], ...]] = ()
_PRIMARY_HOOK_METHODS: ClassVar[tuple[str, ...]] = (
"on_ready", "on_process", "on_physics_process",
"on_enter_tree", "on_exit_tree", "on_draw", "on_picked",
"on_unhandled_input",
)
# Bare names commonly mistaken for SimVX hooks (the engine only dispatches the on_-prefixed forms).
_BARE_HOOK_NAMES: ClassVar[tuple[str, ...]] = (
"ready", "process", "physics_process", "draw", "input",
)
[docs]
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
Node._registry[cls.__name__] = cls
# Lint: catch bare hook names (e.g. `def ready(self)`) that the engine silently never invokes.
for name in cls._BARE_HOOK_NAMES:
if name in cls.__dict__ and callable(cls.__dict__[name]):
raise TypeError(
f"{cls.__name__}.{name}: not a SimVX hook: did you mean 'on_{name}'?"
)
# Collect lifecycle and input handlers (decorated + same-named overrides)
# walking the MRO most-derived-last, mirroring Property.__set_name__.
cls._simvx_hooks, cls._simvx_input_handlers = collect_hooks(cls, cls._PRIMARY_HOOK_METHODS)
# Auto-super: wrap user __init__ that doesn't call super().__init__
if "__init__" not in cls.__dict__:
return # No custom __init__: nothing to wrap
if cls.__dict__.get("__auto_init__") is False:
return # Opted out
user_init = cls.__dict__["__init__"]
if _init_calls_super(user_init):
return # User handles super(): don't wrap
user_sig = inspect.signature(user_init)
user_params = user_sig.parameters
# Determine which params (beyond self) the user accepts
user_param_names = [n for n in user_params if n != "self"]
has_var_keyword = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in user_params.values())
has_var_positional = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in user_params.values())
# Positional param names (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD), in order
positional_names = [
n for n, p in user_params.items()
if n != "self" and p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
]
# Find the parent __init__ to call (the next in MRO that is not the user's)
parent_init = None
for base in cls.__mro__[1:]:
if "__init__" in base.__dict__:
parent_init = base.__dict__["__init__"]
break
if parent_init is None:
parent_init = Node.__init__
# Inspect parent __init__ signature to know what it accepts.
# follow_wrapped=False gets the actual wrapper signature (not the original
# user function's signature that functools.wraps copies).
try:
parent_sig = inspect.signature(parent_init, follow_wrapped=False)
parent_param_names = {n for n in parent_sig.parameters if n != "self"}
parent_has_var_kw = any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in parent_sig.parameters.values()
)
except (ValueError, TypeError):
parent_param_names = set()
parent_has_var_kw = True # Assume flexible
has_on_change = getattr(cls, '_has_on_change_hooks', False)
@functools.wraps(user_init)
def _wrapped_init(self, *args, **all_kwargs):
# Map positional args to their parameter names so we can split by name
user_args = args
if args and not has_var_positional:
for i, val in enumerate(args):
if i < len(positional_names):
pname = positional_names[i]
if pname in all_kwargs:
raise TypeError(f"__init__() got multiple values for argument '{pname}'")
all_kwargs[pname] = val
user_args = () # All positional mapped to kwargs
# Split kwargs: parent_kw goes to parent __init__, user_fwd goes to user's __init__
props = cls.get_properties()
parent_kw = {}
user_fwd = {}
for k, v in all_kwargs.items():
is_user_param = k in user_param_names
is_parent_named = k in parent_param_names # Explicitly named in parent sig
is_prop = k in props
is_engine = k in Node._NODE_INIT_KWARGS
if is_user_param:
user_fwd[k] = v
if is_parent_named or is_prop or is_engine:
parent_kw[k] = v # Also pass to parent (e.g. 'name', properties)
elif is_parent_named or is_prop or is_engine:
parent_kw[k] = v # Known parent/engine/property kwarg
elif has_var_keyword:
user_fwd[k] = v # Unknown kwarg, user accepts **kwargs
elif parent_has_var_kw:
parent_kw[k] = v # Fallback to parent **kwargs (will warn)
else:
raise TypeError(f"{cls.__name__}.__init__() got unexpected keyword argument {k!r}")
we_set_init_scope = has_on_change and not getattr(self, '_on_change_init', False)
if we_set_init_scope:
self._on_change_init = True
self._on_change_pending = []
try:
# Initialise via parent chain (e.g. Node2D.__init__ -> Node.__init__)
parent_init(self, **parent_kw)
# Forward to user's __init__
if has_var_keyword or has_var_positional:
user_init(self, *user_args, **user_fwd)
elif user_param_names:
forward = {k: v for k, v in user_fwd.items() if k in user_param_names}
user_init(self, *user_args, **forward)
else:
user_init(self)
finally:
if we_set_init_scope:
self._on_change_init = False
self._flush_on_change_pending()
cls.__init__ = _wrapped_init
# Class-level marker that lets ``_draw_recursive`` cheaply detect CanvasLayer
# children without importing the subclass (which lives in a sibling module
# and would otherwise force a per-frame ``from ... import CanvasLayer``).
# ``CanvasLayer`` overrides this to ``True``.
_is_canvas_layer: bool = False
def __init__(self, name: str = "", **kwargs):
if name and not isinstance(name, str):
raise TypeError(f"Node name must be a string, got {type(name).__name__}")
self._name = name or type(self).__name__
self.parent: Node | None = None
self.children = Children()
# Count of direct children whose ``_is_canvas_layer`` flag is True.
# Maintained by add_child / remove_child so ``_draw_recursive`` can
# skip the sort/partition fast path when zero.
self._canvas_layer_child_count: int = 0
self._tree: SceneTree | None = None
self._coroutines: list[Coroutine] = []
self._groups: set[str] = set()
self._scene_template_path: str | None = None
self.script: str | None = None
self._script_embedded: str | None = None # source stored in scene, importable
self._script_module = None # ModuleType | None: cached loaded module
self._script_original_class: type | None = None # original class before script swap
self._visible: bool = True
self._visible_in_hierarchy: bool = True
self._process_mode: ProcessMode = ProcessMode.INHERIT
self._cached_process_mode: ProcessMode | None = None # cached resolved mode
self.unique_name: bool = False
self._script_error: bool = False
self._outgoing_connections: list = [] # signals connected via this node's bound methods
# Open the on_change deferral scope if this class has any on_change
# hooks AND no enclosing init wrapper has already opened one. The flag
# set here is cleared in the matching `finally` after kwargs are applied.
we_set_init_scope = (
getattr(type(self), '_has_on_change_hooks', False)
and not getattr(self, '_on_change_init', False)
)
if we_set_init_scope:
self._on_change_init = True
self._on_change_pending = []
try:
# Apply Property values passed as kwargs
props = self.get_properties()
for key, val in kwargs.items():
if key in props:
setattr(self, key, val)
else:
raise TypeError(f"{type(self).__name__}: unknown kwarg {key!r}")
finally:
if we_set_init_scope:
self._on_change_init = False
self._flush_on_change_pending()
@property
def name(self) -> str:
return self._name
[docs]
@name.setter
def name(self, value: str):
old = self._name
self._name = value
parent = getattr(self, 'parent', None)
if parent is not None:
names = parent.children._names
if old and names.get(old) is self:
del names[old]
if value:
names[value] = self
@property
def process_mode(self) -> ProcessMode:
return self._process_mode
[docs]
@process_mode.setter
def process_mode(self, value: ProcessMode):
self._process_mode = value
self._invalidate_process_mode_cache()
@property
def visible(self) -> bool:
return self._visible
[docs]
@visible.setter
def visible(self, value: bool):
value = bool(value)
if value == self._visible:
return
self._visible = value
parent_effective = True if self.parent is None else self.parent._visible_in_hierarchy
self._propagate_visibility(parent_effective)
self._notification(Notification.VISIBILITY_CHANGED)
def _propagate_visibility(self, parent_effective: bool) -> None:
"""Update _visible_in_hierarchy for self and descendants.
Propagates the effective visibility down the subtree; prunes branches
whose effective state didn't change so toggles cost O(changed-subtree)
rather than O(whole-subtree).
"""
new_effective = parent_effective and self._visible
if self._visible_in_hierarchy == new_effective:
return
self._visible_in_hierarchy = new_effective
for child in self.children:
child._propagate_visibility(new_effective)
def _invalidate_process_mode_cache(self):
"""Clear cached process mode for this node and descendants that inherit."""
self._cached_process_mode = None
for child in self.children:
if child._process_mode == ProcessMode.INHERIT:
child._invalidate_process_mode_cache()
def _notification(self, what: Notification) -> None:
"""Called when a notification is dispatched. Override to handle."""
[docs]
def reset_error(self) -> None:
"""Clear script error flag to re-enable processing."""
self._script_error = False
def _flush_on_change_pending(self) -> None:
"""Dispatch on_change hooks queued during ``__init__``.
Hooks are deduplicated by ``(property_attr, method_name)`` so multiple
sets of the same property during construction fire the hook once. Order
of first occurrence is preserved.
"""
pending = getattr(self, '_on_change_pending', None)
if not pending:
return
self._on_change_pending = []
seen: set[tuple[str, str]] = set()
for prop_attr, method_name in pending:
key = (prop_attr, method_name)
if key in seen:
continue
seen.add(key)
method = getattr(self, method_name, None)
if method is not None:
method()
def _safe_call(self, method, *args: Any) -> None:
"""Call a lifecycle method with error recovery."""
if self._script_error:
return
try:
method(*args)
except AssertionError:
raise
except Exception:
if Node.strict_errors:
raise
self._script_error = True
import sys
import traceback
tb = traceback.format_exc()
# Always print to stderr so errors are never invisible
print(f"Script error in {self.name}.{method.__name__}: node disabled:\n{tb}", file=sys.stderr)
log.error("Script error in %s.%s: node disabled", self.name, method.__name__)
try:
Node.script_error_raised.emit(self, method.__name__, tb)
except Exception:
# justified: signal-handler errors must not derail error recovery itself
pass
[docs]
def add_child(self, node: Node) -> Node:
"""Add a node as a child, reparenting it if already in a tree.
Args:
node: The node to add. Removed from its current parent first.
Raises:
ValueError: ``node`` is ``self`` or one of ``self``'s ancestors --
either would create a cycle in the scene tree.
"""
if node is self:
raise ValueError(f"Cannot add node {node.name!r} as child of itself")
ancestor = self.parent
while ancestor is not None:
if ancestor is node:
raise ValueError(
f"Cannot reparent {node.name!r} under its descendant "
f"{self.name!r}: would create a cycle"
)
ancestor = ancestor.parent
if node.parent:
node.parent.remove_child(node)
node.parent = self
self.children._add(node)
if node._is_canvas_layer:
self._canvas_layer_child_count += 1
node._notification(Notification.PARENTED)
node._invalidate_process_mode_cache()
node._propagate_visibility(self._visible_in_hierarchy)
if hasattr(node, '_invalidate_transform'):
node._invalidate_transform()
if self._tree:
self._tree._structure_version += 1
node._enter_tree(self._tree)
node._ready_recursive()
return node
[docs]
def remove_child(self, node: Node):
"""Remove a child node from this node's children."""
if node in self.children:
if self._tree:
self._tree._structure_version += 1
node._exit_tree()
self.children._remove(node)
if node._is_canvas_layer and self._canvas_layer_child_count > 0:
self._canvas_layer_child_count -= 1
node._notification(Notification.UNPARENTED)
node.parent = None
node._invalidate_process_mode_cache()
node._propagate_visibility(True)
[docs]
def reparent(self, new_parent: Node):
"""Remove from current parent and add to new_parent."""
if self.parent:
self.parent.remove_child(self)
new_parent.add_child(self)
[docs]
def get_node(self, path: str) -> Node:
"""Navigate tree by path: 'Child/GrandChild' or '../Sibling'."""
current = self
parts = [p for p in path.split('/') if p]
if path.startswith('/'):
while current.parent:
current = current.parent
# An absolute path may optionally name the root as its first segment
# (e.g. '/Root/Player'). Consume it once so it resolves to root.
if parts and parts[0] == current.name:
parts.pop(0)
for part in parts:
if part == '..':
current = current.parent
if current is None:
raise ValueError("Already at root")
else:
current = current.children[part]
return current
[docs]
def find_child(self, name: str, recursive: bool = False) -> Node | None:
"""Find first child with the given name."""
for c in self.children:
if c.name == name:
return c
if recursive:
found = c.find_child(name, recursive=True)
if found:
return found
return None
[docs]
def find(self, target: type | str, recursive: bool = True) -> Node | None:
"""Find first descendant matching ``target``.
Dispatches on the argument type:
- ``find(NodeClass)`` returns the first descendant ``isinstance`` of
``NodeClass`` (recursive by default).
- ``find("name")`` returns the first descendant whose ``name`` attribute
equals the string (recursive by default).
Recursion is depth-first, pre-order. Returns ``None`` when no match.
"""
if isinstance(target, str):
for child in self.children:
if child.name == target:
return child
if recursive:
found = child.find(target, recursive=True)
if found:
return found
return None
for child in self.children:
if isinstance(child, target):
return child
if recursive:
found = child.find(target, recursive=True)
if found:
return found
return None
[docs]
def find_all(self, node_type: type, recursive: bool = True) -> list:
"""Find all descendants of type."""
result = []
for child in self.children:
if isinstance(child, node_type):
result.append(child)
if recursive:
result.extend(child.find_all(node_type))
return result
[docs]
def walk(self, *, include_self: bool = True) -> Iterator[Node]:
"""Iterate this node and all descendants in DFS pre-order."""
if include_self:
yield self
for child in self.children:
yield from child.walk(include_self=True)
[docs]
@property
def path(self) -> str:
if self.parent is None:
return f"/{self.name}"
return f"{self.parent.path}/{self.name}"
# --- Groups ---
[docs]
def add_to_group(self, group: str):
"""Add this node to a named group."""
self._groups.add(group)
if self._tree:
self._tree._group_add(group, self)
[docs]
def remove_from_group(self, group: str):
"""Remove this node from a named group."""
self._groups.discard(group)
if self._tree:
self._tree._group_remove(group, self)
[docs]
def is_in_group(self, group: str) -> bool:
"""Check if this node belongs to a named group."""
return group in self._groups
# --- Lifecycle (override in subclasses) ---
[docs]
def on_ready(self) -> None:
"""Called once after the node and all its children enter the scene tree.
Override to perform initialisation that requires the scene tree --
finding sibling nodes, connecting signals, spawning children. The
``tree`` property is available. Called after ``on_enter_tree()`` and
after all children's ``on_ready()``.
Decorate other methods with ``@on_ready`` to register additional
ready handlers; they fire after the override in declaration order.
Note:
Fires again if the node is removed and re-added to the tree.
Example::
def on_ready(self):
self.sprite = self.get_node("Sprite")
self.health_changed.connect(self._update_hud)
"""
[docs]
def on_enter_tree(self) -> None:
"""Called when the node enters the scene tree, before ``on_ready()``.
Override for setup that must happen the moment the tree reference
becomes available. Children have not entered yet at this point, so
avoid querying child nodes here -- use ``on_ready()`` instead.
Example::
def on_enter_tree(self):
self.add_to_group("enemies")
"""
[docs]
def on_exit_tree(self) -> None:
"""Called when the node is about to leave the scene tree.
Override to clean up resources, disconnect external signals, or
persist state. Children have already exited by the time this fires
on the parent.
Example::
def on_exit_tree(self):
self.save_progress()
self.remove_from_group("enemies")
"""
[docs]
def on_process(self, dt: float) -> None:
"""Called every frame for game logic.
Args:
dt: Seconds elapsed since the previous frame (variable timestep).
Override for movement, AI, animation triggers, or any per-frame
update. Obeys ``process_mode`` -- disabled or paused nodes are
skipped automatically.
Decorate other methods with ``@on_process`` to register additional
per-frame handlers; they fire after the override in declaration
order. For state held while a button is pressed, poll
``Input.is_action_pressed("name")`` from inside ``on_process``.
Example::
def on_process(self, dt):
self.position += self.velocity * dt
"""
[docs]
def on_physics_process(self, dt: float) -> None:
"""Called at a fixed timestep (default 60 Hz) for physics logic.
Args:
dt: Fixed time step in seconds (e.g. 1/60).
Override for deterministic physics updates -- forces, collision
responses, rigid-body integration. Runs independently of the
render frame rate.
Example::
def on_physics_process(self, dt):
self.velocity += self.gravity * dt
self.move_and_slide()
"""
[docs]
def on_draw(self, renderer) -> None:
"""Called each frame for custom 2D drawing.
Args:
renderer: The active draw-command recorder (e.g. ``Draw2D``).
Override to issue immediate-mode draw calls such as ``draw_line``,
``draw_rect``, or ``draw_text``. Called only when ``visible`` is
``True``.
Example::
def on_draw(self, renderer):
renderer.draw_circle(self.world_position, 10, colour=(1, 0, 0, 1))
"""
[docs]
def on_picked(self, event: InputEvent) -> None:
"""Called when a 3D mouse-pick event hits this node's collision shape.
Args:
event: The input event containing click position, camera ray, etc.
Override to react to direct interaction with this 3D object --
selection, dragging, context menus.
Example::
def on_picked(self, event):
if event.button == MouseButton.LEFT:
self.selected = True
"""
# --- Coroutine support ---
[docs]
def start_coroutine(self, gen: Coroutine) -> CoroutineHandle:
"""Register a generator coroutine to run each frame. Returns a cancellable handle."""
handle = CoroutineHandle(gen)
self._coroutines.append(handle)
return handle
[docs]
def stop_coroutine(self, gen_or_handle):
"""Stop and remove a running coroutine (accepts generator or CoroutineHandle)."""
if isinstance(gen_or_handle, CoroutineHandle):
gen_or_handle.cancel()
if gen_or_handle in self._coroutines:
self._coroutines.remove(gen_or_handle)
return
for h in self._coroutines:
if h._gen is gen_or_handle:
h.cancel()
self._coroutines.remove(h)
return
def _tick_coroutines(self, dt: float):
if not self._coroutines:
return
finished = []
for handle in self._coroutines:
if handle.is_cancelled:
finished.append(handle)
continue
gen = handle._gen
try:
if handle._primed:
gen.send(dt)
else:
next(gen)
handle._primed = True
except StopIteration:
finished.append(handle)
for handle in finished:
# stop_coroutine() invoked from inside the coroutine may have already removed it.
if handle in self._coroutines:
self._coroutines.remove(handle)
# --- Tree internals ---
def _enter_tree(self, tree: SceneTree):
self._tree = tree
# Instantiate declared Child descriptors
declared = getattr(type(self), '_declared_children', None)
if declared:
for attr_name, child_desc in declared.items():
if self.__dict__.get(attr_name) is None:
kwargs = dict(child_desc._kwargs)
if 'name' not in kwargs:
kwargs['name'] = attr_name
node = child_desc._type(*child_desc._args, **kwargs)
self.__dict__[attr_name] = node
self.children._add(node)
node.parent = self
if self.unique_name:
tree._unique_nodes[self.name] = self
for group in self._groups:
tree._group_add(group, self)
self._notification(Notification.ENTER_TREE)
for method_name in type(self)._simvx_hooks.get("enter_tree", ()):
self._safe_call(getattr(self, method_name))
# Register @on_input handlers with the tree's dispatch tables.
if type(self)._simvx_input_handlers:
tree._register_input_node(self)
for child in self.children:
child._enter_tree(tree)
def _exit_tree(self):
for child in self.children:
child._exit_tree()
self._notification(Notification.EXIT_TREE)
for method_name in type(self)._simvx_hooks.get("exit_tree", ()):
self._safe_call(getattr(self, method_name))
# Close any in-flight coroutines so their ``finally:`` blocks run
# (releases signal-handler subscriptions, restores transforms, etc.).
# Without this, ``wait_signal`` lambdas stay attached to the signal
# for the lifetime of the emitter: a slow leak on scene churn.
if self._coroutines:
for handle in self._coroutines:
try:
handle._gen.close()
except Exception:
log.exception("Coroutine close raised on node exit for %r", self)
self._coroutines.clear()
if self._tree:
if type(self)._simvx_input_handlers:
self._tree._unregister_input_node(self)
if self.unique_name:
self._tree._unique_nodes.pop(self.name, None)
for group in self._groups:
self._tree._group_remove(group, self)
self._tree = None
def _ready_recursive(self):
for child in self.children:
child._ready_recursive()
self._notification(Notification.READY)
for method_name in type(self)._simvx_hooks.get("ready", ()):
self._safe_call(getattr(self, method_name))
def _effective_process_mode(self) -> ProcessMode:
"""Resolve INHERIT by walking up the tree (cached)."""
cached = self._cached_process_mode
if cached is not None:
return cached
mode = self._process_mode
if mode == ProcessMode.INHERIT:
mode = self.parent._effective_process_mode() if self.parent else ProcessMode.PAUSABLE
self._cached_process_mode = mode
return mode
def _can_process(self, paused: bool) -> bool:
"""Check if this node should process given the tree's pause state."""
mode = self._effective_process_mode()
if mode == ProcessMode.DISABLED:
return False
if mode == ProcessMode.ALWAYS:
return True
if mode == ProcessMode.PAUSED_ONLY:
return paused
# PAUSABLE (or resolved INHERIT → PAUSABLE)
return not paused
def _process_recursive(self, dt: float, paused: bool = False):
if self._script_error:
return
# Inlined _can_process: resolve mode from cache and check pause state
mode = self._cached_process_mode
if mode is None:
mode = self._effective_process_mode()
if mode != ProcessMode.DISABLED and (
mode == ProcessMode.ALWAYS
or (not paused if mode == ProcessMode.PAUSABLE else paused)
):
self._notification(Notification.PROCESS)
handlers = type(self)._simvx_hooks.get("process", ())
for method_name in handlers:
try:
getattr(self, method_name)(dt)
except AssertionError:
raise
except Exception:
if Node.strict_errors:
raise
self._script_error = True
import sys
import traceback
tb = traceback.format_exc()
print(f"Script error in {self.name}.{method_name}: node disabled:\n{tb}", file=sys.stderr)
log.error("Script error in %s.%s: node disabled", self.name, method_name)
try:
Node.script_error_raised.emit(self, method_name, tb)
except Exception:
# justified: signal-handler error during process; node already disabled, recursion continues
pass
return
if self._coroutines:
self._tick_coroutines(dt)
for child in self.children.safe_iter():
child._process_recursive(dt, paused)
def _physics_process_recursive(self, dt: float, paused: bool = False):
if self._script_error:
return
# Inlined _can_process: resolve mode from cache and check pause state
mode = self._cached_process_mode
if mode is None:
mode = self._effective_process_mode()
if mode != ProcessMode.DISABLED and (
mode == ProcessMode.ALWAYS
or (not paused if mode == ProcessMode.PAUSABLE else paused)
):
self._notification(Notification.PHYSICS_PROCESS)
handlers = type(self)._simvx_hooks.get("physics_process", ())
for method_name in handlers:
try:
getattr(self, method_name)(dt)
except AssertionError:
raise
except Exception:
if Node.strict_errors:
raise
self._script_error = True
import sys
import traceback
tb = traceback.format_exc()
print(f"Script error in {self.name}.{method_name}: node disabled:\n{tb}", file=sys.stderr)
log.error("Script error in %s.%s: node disabled", self.name, method_name)
try:
Node.script_error_raised.emit(self, method_name, tb)
except Exception:
# justified: signal-handler error during physics_process; node already disabled, recursion continues
pass
return
for child in self.children.safe_iter():
child._physics_process_recursive(dt, paused)
def _draw_recursive(self, renderer):
if not self.visible:
return
if self._script_error:
for child in self.children.safe_iter():
child._draw_recursive(renderer)
return
self._draw_dispatch(renderer)
# Fast path: ``_canvas_layer_child_count`` is maintained by add_child /
# remove_child, so the zero-CanvasLayer case skips the sort/partition
# entirely (and avoids the per-frame ``from .nodes_2d.canvas import
# CanvasLayer`` lookup the old check forced).
if self._canvas_layer_child_count == 0:
for child in self.children.safe_iter():
child._draw_recursive(renderer)
return
# Sort CanvasLayer children by their ``layer`` rank (negative below
# self.on_draw handlers, positive after non-layer children) so HUD
# layers render above world content regardless of tree-add order.
# This matches Node2D._draw_recursive's z-ordering scheme.
non_layers = []
canvas_layers = []
for c in self.children.safe_iter():
(canvas_layers if c._is_canvas_layer else non_layers).append(c)
canvas_layers.sort(key=lambda c: c.layer)
# Negative-layer CanvasLayers draw before non-layer siblings; zero/positive after.
for layer in canvas_layers:
if layer.layer < 0:
layer._draw_recursive(renderer)
for child in non_layers:
child._draw_recursive(renderer)
for layer in canvas_layers:
if layer.layer >= 0:
layer._draw_recursive(renderer)
def _draw_dispatch(self, renderer):
"""Invoke all ``on_draw`` handlers (override + decorated) for this node.
Used by ``_draw_recursive`` and by Control/Node2D subclasses that
wrap drawing with caching, transforms, or clipping but still want
to fire the same ordered set of handlers.
"""
for method_name in type(self)._simvx_hooks.get("draw", ()):
self._safe_call(getattr(self, method_name), renderer)
[docs]
def clear_children(self):
"""Destroy all children of this node."""
for child in list(self.children):
child.destroy()
[docs]
def destroy(self):
"""Schedule this node for removal at the end of the current frame.
Signal connections made through this node's bound methods are
proactively disconnected so emitters stop dispatching to it on the
next emit (Godot 4 behaviour). Lazy weak-ref cleanup in
``Signal.__call__`` covers nodes that are GC'd without ``destroy()``.
"""
for conn in list(self._outgoing_connections):
conn.disconnect()
self._outgoing_connections.clear()
if self._tree:
self._tree._queue_delete(self)
[docs]
@property
def app(self):
"""The App running this node's scene tree. Available after enter_tree()."""
return self._tree.app if self._tree else None
[docs]
@property
def tree(self) -> SceneTree:
"""The SceneTree this node belongs to."""
return self._tree
[docs]
def __getitem__(self, key: str):
"""Shorthand for get_node: ``self["Child/Path"]``."""
return self.get_node(key)
[docs]
@classmethod
def get_properties(cls) -> dict[str, Property]:
"""Return all Property descriptors declared on this node class and its bases."""
return getattr(cls, '__properties__', {})
[docs]
def __repr__(self):
return f"<{type(self).__name__} '{self.name}'>"
# Register Node itself (not covered by __init_subclass__ which only fires for subclasses)
Node._registry["Node"] = Node
# Collect primary lifecycle hooks defined directly on Node (no decorators on the
# base class) so bare Node instances dispatch through the same code path as
# subclasses.
Node._simvx_hooks, Node._simvx_input_handlers = collect_hooks(Node, Node._PRIMARY_HOOK_METHODS)
# ============================================================================
# Timer
# ============================================================================
[docs]
class Timer(Node):
"""Fires timeout signal after duration. Supports one-shot and repeating."""
duration = Property(1.0, range=(0.001, 3600))
one_shot = Property(True)
autostart = Property(False)
def __init__(self, duration: float = 1.0, one_shot: bool = True,
autostart: bool = False, **kwargs):
super().__init__(**kwargs)
self.duration = duration
self.one_shot = one_shot
self.timeout = Signal()
self._time_left = duration if autostart else 0.0
self._running = autostart
[docs]
def start(self, duration: float = 0):
"""Start or restart the timer, optionally overriding duration."""
if duration > 0:
self.duration = duration
self._time_left = self.duration
self._running = True
[docs]
def stop(self):
"""Stop the timer and reset time_left to zero."""
self._running = False
self._time_left = 0.0
[docs]
@property
def stopped(self) -> bool:
return not self._running
[docs]
@property
def time_left(self) -> float:
return self._time_left
[docs]
def on_process(self, dt: float):
if not self._running:
return
self._time_left -= dt
if self._time_left <= 0:
self.timeout()
if self.one_shot:
self._running = False
else:
self._time_left += self.duration