Source code for simvx.core.testing.scene_runner

"""SceneRunner -- headless scene runner for testing game logic."""

import copy
import logging
from typing import Any

from ..input import Input
from ..node import Node
from ..scene_tree import SceneTree

log = logging.getLogger(__name__)

__all__ = ["SceneRunner"]

[docs] class SceneRunner: """Headless scene runner for testing game logic without rendering. Manages a SceneTree, advances frames, and provides query helpers. """ def __init__(self, screen_size: tuple[float, float] = (800, 600)): self.tree = SceneTree(screen_size=screen_size) self._frame_count = 0 self._dt = 1 / 60 self._physics_dt = 1 / 60 self._time = 0.0 self._draw_renderer = None # Set to a DrawLog for draw validation
[docs] def load(self, root_node: Node) -> SceneRunner: """Load a scene by setting the root node.""" self.tree.set_root(root_node) self._frame_count = 0 self._time = 0.0 return self
[docs] def advance_frames(self, count: int = 1, dt: float | None = None, draw: bool = False) -> SceneRunner: """Advance the scene by N frames (tick + physics_tick each). If *draw* is True, also calls ``tree.render()`` with a mock renderer each frame. This exercises draw() methods and catches errors that would otherwise only surface with a real Vulkan renderer. Drains :class:`InputSimulator` pending-release queue at the start of every iteration so ``sim.tap_key()`` can fire the release on a later frame than the press (preserving the ``is_action_just_pressed`` / ``is_action_just_released`` edges). """ frame_dt = dt or self._dt from .input_sim import InputSimulator for _ in range(count): self.tree.tick(frame_dt) self.tree.physics_tick(self._physics_dt) if draw: from ..ui.testing import DrawLog self.tree.render(self._draw_renderer or DrawLog()) Input._end_frame() Input._new_frame() # Drain queued tap_key releases AFTER the just-press edge has # been consumed by this frame's process pass. The release # writes into ``_keys_just_released_typed`` and survives until # the NEXT _end_frame, so the next iteration's process call # sees ``is_action_just_released`` exactly once. InputSimulator.flush_pending_releases() self._frame_count += 1 self._time += frame_dt return self
[docs] def simulate_until( self, predicate, max_ticks: int = 600, dt: float | None = None, ) -> bool: """Tick the scene until ``predicate(scene)`` returns truthy. Args: predicate: Callable receiving the scene root (or ``None`` if no root) and returning a truthy value when the desired state has been reached. max_ticks: Maximum frames to advance before giving up. Default 600 (10 seconds at 60 FPS). dt: Frame timestep. Defaults to the runner's ``_dt`` (1/60 s). Returns: ``True`` if the predicate was satisfied within ``max_ticks``, ``False`` on timeout. The predicate is evaluated BEFORE the first tick so an already-satisfied state returns ``True`` with zero frames advanced. Replaces the per-test ``for _ in range(N): runner.advance_frames(1); if cond(): break`` boilerplate (You're the OS, PyDew Valley). """ if predicate(self.root): return True for _ in range(max_ticks): self.advance_frames(1, dt=dt) if predicate(self.root): return True return False
[docs] def advance_time(self, seconds: float, dt: float | None = None) -> SceneRunner: """Advance the scene by approximately N seconds worth of frames.""" frame_dt = dt or self._dt frames = max(1, int(seconds / frame_dt)) return self.advance_frames(frames, frame_dt)
[docs] def find(self, name_or_type, recursive: bool = True) -> Node | None: """Find a node by name (str) or by type in the scene tree.""" if self.tree.root is None: return None if isinstance(name_or_type, str): return self._find_by_name(self.tree.root, name_or_type) return self._find_by_type(self.tree.root, name_or_type, recursive)
[docs] def find_all(self, node_type: type) -> list[Node]: """Find all nodes of a given type in the scene.""" if self.tree.root is None: return [] result = [] if isinstance(self.tree.root, node_type): result.append(self.tree.root) result.extend(self.tree.root.find_all(node_type)) return result
def _find_by_name(self, node: Node, name: str) -> Node | None: if node.name == name: return node for child in node.children: result = self._find_by_name(child, name) if result: return result return None def _find_by_type(self, node: Node, node_type: type, recursive: bool) -> Node | None: if isinstance(node, node_type): return node if recursive: for child in node.children: result = self._find_by_type(child, node_type, recursive) if result: return result else: for child in node.children: if isinstance(child, node_type): return child return None
[docs] @property def root(self) -> Node | None: return self.tree.root
[docs] @property def frame_count(self) -> int: return self._frame_count
[docs] @property def elapsed_time(self) -> float: return self._time
[docs] def snapshot(self) -> dict: """Capture current scene state as a serializable dict.""" if self.tree.root is None: return {} return _snapshot_node(self.tree.root)
def _snapshot_node(node: Node) -> dict: """Recursively capture a node's state as a serializable dict.""" state: dict[str, Any] = { "name": node.name, "type": type(node).__name__, "settings": {}, "properties": {}, "children": [], } # Capture Property descriptor values. Mutable containers (list, dict, set) # are deep-copied so later mutation of the live property does NOT poison the # snapshot: without this, `scene_diff(before, after)` silently returns no # differences when an inventory/list-property is appended to between snaps. for name in type(node).get_properties(): try: val = getattr(node, name) if isinstance(val, (list, dict, set)): state["settings"][name] = copy.deepcopy(val) elif isinstance(val, (int, float, bool, str, tuple)): state["settings"][name] = val except (TypeError, ValueError, AttributeError): log.debug("Snapshot serialization failed for node", exc_info=True) # Capture spatial properties (not Settings, but essential for game testing) for attr in ("position", "rotation", "scale"): if hasattr(node, attr): val = getattr(node, attr) try: # Convert Vec2/Vec3/Quat to tuple for serializability if hasattr(val, "x") and hasattr(val, "y"): if hasattr(val, "z"): if hasattr(val, "w"): state["properties"][attr] = (float(val.x), float(val.y), float(val.z), float(val.w)) else: state["properties"][attr] = (float(val.x), float(val.y), float(val.z)) else: state["properties"][attr] = (float(val.x), float(val.y)) elif isinstance(val, (int, float)): state["properties"][attr] = val except (TypeError, ValueError, AttributeError): log.debug("Snapshot serialization failed for property", exc_info=True) # Capture visible state state["properties"]["visible"] = node.visible for child in node.children: state["children"].append(_snapshot_node(child)) return state