"""SceneRunner -- headless scene runner for testing game logic."""
import copy
import logging
from typing import Any
from ..input import Input
from ..node import Node
from ..scene_tree import SceneTree
log = logging.getLogger(__name__)
__all__ = ["SceneRunner"]
[docs]
class SceneRunner:
"""Headless scene runner for testing game logic without rendering.
Manages a SceneTree, advances frames, and provides query helpers.
"""
def __init__(self, screen_size: tuple[float, float] = (800, 600)):
self.tree = SceneTree(screen_size=screen_size)
self._frame_count = 0
self._dt = 1 / 60
self._physics_dt = 1 / 60
self._time = 0.0
self._draw_renderer = None # Set to a DrawLog for draw validation
[docs]
def load(self, root_node: Node) -> SceneRunner:
"""Load a scene by setting the root node."""
self.tree.set_root(root_node)
self._frame_count = 0
self._time = 0.0
return self
[docs]
def advance_frames(self, count: int = 1, dt: float | None = None, draw: bool = False) -> SceneRunner:
"""Advance the scene by N frames (tick + physics_tick each).
If *draw* is True, also calls ``tree.render()`` with a mock renderer each
frame. This exercises draw() methods and catches errors that would
otherwise only surface with a real Vulkan renderer.
Drains :class:`InputSimulator` pending-release queue at the start of
every iteration so ``sim.tap_key()`` can fire the release on a later
frame than the press (preserving the
``is_action_just_pressed`` / ``is_action_just_released`` edges).
"""
frame_dt = dt or self._dt
from .input_sim import InputSimulator
for _ in range(count):
self.tree.tick(frame_dt)
self.tree.physics_tick(self._physics_dt)
if draw:
from ..ui.testing import DrawLog
self.tree.render(self._draw_renderer or DrawLog())
Input._end_frame()
Input._new_frame()
# Drain queued tap_key releases AFTER the just-press edge has
# been consumed by this frame's process pass. The release
# writes into ``_keys_just_released_typed`` and survives until
# the NEXT _end_frame, so the next iteration's process call
# sees ``is_action_just_released`` exactly once.
InputSimulator.flush_pending_releases()
self._frame_count += 1
self._time += frame_dt
return self
[docs]
def simulate_until(
self,
predicate,
max_ticks: int = 600,
dt: float | None = None,
) -> bool:
"""Tick the scene until ``predicate(scene)`` returns truthy.
Args:
predicate: Callable receiving the scene root (or ``None`` if no
root) and returning a truthy value when the desired state has
been reached.
max_ticks: Maximum frames to advance before giving up. Default
600 (10 seconds at 60 FPS).
dt: Frame timestep. Defaults to the runner's ``_dt`` (1/60 s).
Returns:
``True`` if the predicate was satisfied within ``max_ticks``,
``False`` on timeout. The predicate is evaluated BEFORE the
first tick so an already-satisfied state returns ``True`` with
zero frames advanced.
Replaces the per-test ``for _ in range(N): runner.advance_frames(1);
if cond(): break`` boilerplate (You're the OS, PyDew Valley).
"""
if predicate(self.root):
return True
for _ in range(max_ticks):
self.advance_frames(1, dt=dt)
if predicate(self.root):
return True
return False
[docs]
def advance_time(self, seconds: float, dt: float | None = None) -> SceneRunner:
"""Advance the scene by approximately N seconds worth of frames."""
frame_dt = dt or self._dt
frames = max(1, int(seconds / frame_dt))
return self.advance_frames(frames, frame_dt)
[docs]
def find(self, name_or_type, recursive: bool = True) -> Node | None:
"""Find a node by name (str) or by type in the scene tree."""
if self.tree.root is None:
return None
if isinstance(name_or_type, str):
return self._find_by_name(self.tree.root, name_or_type)
return self._find_by_type(self.tree.root, name_or_type, recursive)
[docs]
def find_all(self, node_type: type) -> list[Node]:
"""Find all nodes of a given type in the scene."""
if self.tree.root is None:
return []
result = []
if isinstance(self.tree.root, node_type):
result.append(self.tree.root)
result.extend(self.tree.root.find_all(node_type))
return result
def _find_by_name(self, node: Node, name: str) -> Node | None:
if node.name == name:
return node
for child in node.children:
result = self._find_by_name(child, name)
if result:
return result
return None
def _find_by_type(self, node: Node, node_type: type, recursive: bool) -> Node | None:
if isinstance(node, node_type):
return node
if recursive:
for child in node.children:
result = self._find_by_type(child, node_type, recursive)
if result:
return result
else:
for child in node.children:
if isinstance(child, node_type):
return child
return None
[docs]
@property
def root(self) -> Node | None:
return self.tree.root
[docs]
@property
def frame_count(self) -> int:
return self._frame_count
[docs]
@property
def elapsed_time(self) -> float:
return self._time
[docs]
def snapshot(self) -> dict:
"""Capture current scene state as a serializable dict."""
if self.tree.root is None:
return {}
return _snapshot_node(self.tree.root)
def _snapshot_node(node: Node) -> dict:
"""Recursively capture a node's state as a serializable dict."""
state: dict[str, Any] = {
"name": node.name,
"type": type(node).__name__,
"settings": {},
"properties": {},
"children": [],
}
# Capture Property descriptor values. Mutable containers (list, dict, set)
# are deep-copied so later mutation of the live property does NOT poison the
# snapshot: without this, `scene_diff(before, after)` silently returns no
# differences when an inventory/list-property is appended to between snaps.
for name in type(node).get_properties():
try:
val = getattr(node, name)
if isinstance(val, (list, dict, set)):
state["settings"][name] = copy.deepcopy(val)
elif isinstance(val, (int, float, bool, str, tuple)):
state["settings"][name] = val
except (TypeError, ValueError, AttributeError):
log.debug("Snapshot serialization failed for node", exc_info=True)
# Capture spatial properties (not Settings, but essential for game testing)
for attr in ("position", "rotation", "scale"):
if hasattr(node, attr):
val = getattr(node, attr)
try:
# Convert Vec2/Vec3/Quat to tuple for serializability
if hasattr(val, "x") and hasattr(val, "y"):
if hasattr(val, "z"):
if hasattr(val, "w"):
state["properties"][attr] = (float(val.x), float(val.y), float(val.z), float(val.w))
else:
state["properties"][attr] = (float(val.x), float(val.y), float(val.z))
else:
state["properties"][attr] = (float(val.x), float(val.y))
elif isinstance(val, (int, float)):
state["properties"][attr] = val
except (TypeError, ValueError, AttributeError):
log.debug("Snapshot serialization failed for property", exc_info=True)
# Capture visible state
state["properties"]["visible"] = node.visible
for child in node.children:
state["children"].append(_snapshot_node(child))
return state