"""Scripted demo playback — automated input sequences with narration and assertions.
Drive a game/UI scene with pre-recorded steps: move cursor, click, type text,
press keys, wait, assert state, and show narration overlays. Useful for
creating self-playing demos, tutorials, and integration tests.
Usage:
from simvx.core.scripted_demo import DemoRunner, MoveTo, Click, Narrate, Assert
steps = [
Narrate("Welcome to the demo!", duration=2.0),
MoveTo(200, 150, duration=0.5),
Click(200, 150),
Assert(lambda g: g.board[0][0] == "X", "Cell should be X"),
]
runner = DemoRunner(steps, speed=2.0)
game.add_child(runner)
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, ClassVar
from .engine import Input, Node, Signal
from .input.enums import _KEY_TO_NAME, Key
from .testing.input_sim import InputSimulator
log = logging.getLogger(__name__)
# Speed presets: (label, multiplier)
_SPEED_PRESETS: list[tuple[str, float]] = [
(">", 0.25),
(">>", 0.5),
(">>>", 50.0),
]
__all__ = [
"MoveTo",
"Click",
"TypeText",
"PressKey",
"Wait",
"Assert",
"Do",
"Scroll",
"Narrate",
"DemoRunner",
]
# ============================================================================
# Step dataclasses
# ============================================================================
[docs]
@dataclass
class MoveTo:
"""Smoothly move the virtual cursor to a screen position."""
x: float
y: float
duration: float = 0.5
[docs]
@dataclass
class Click:
"""Move to position then click (press + release)."""
x: float
y: float
button: int = 1
[docs]
@dataclass
class TypeText:
"""Type a string character by character."""
text: str
delay_per_char: float = 0.05
[docs]
@dataclass
class PressKey:
"""Press and hold a key for a duration."""
key: int
hold_duration: float = 0.1
[docs]
@dataclass
class Wait:
"""Pause playback for a duration."""
duration: float
[docs]
@dataclass
class Assert:
"""Run a check function against the game node (parent of DemoRunner)."""
check_fn: Any # callable(game_node) -> bool
message: str = ""
actual_fn: Any = None # optional callable(game_node) -> value for diagnostics
[docs]
@dataclass
class Do:
"""Execute an action against the game node (parent of DemoRunner).
Like Assert but semantically different — never fails on return value.
In test_mode, exceptions propagate; in interactive mode, they're logged.
"""
action: Any # callable(game_node)
message: str = ""
[docs]
@dataclass
class Narrate:
"""Display narration text at the bottom of the screen."""
text: str
duration: float = 2.0
# ============================================================================
# DemoRunner — Node that plays back a sequence of steps
# ============================================================================
[docs]
class DemoRunner(Node):
"""Plays a scripted sequence of demo steps, injecting input and drawing overlays.
Add as a child of the game/scene root. In test_mode, hotkeys are disabled
and assertions raise on failure.
Args:
steps: List of step dataclasses to execute in order.
test_mode: If True, skip hotkeys and raise on assertion failure.
on_complete: Optional callback invoked when all steps finish.
speed: Explicit speed override. If set, takes precedence over speed_mode.
speed_mode: Speed preset index (0=slow 0.5x, 1=fast 3x, 2=instant 50x). Default 1.
delay_between_steps: Natural pause (seconds) between steps. Default 0.15.
"""
# Step handler registry: maps step type -> handler(runner, step, dt)
_step_handlers: ClassVar[dict[type, Callable]] = {}
[docs]
@classmethod
def register_step_handler(cls, step_type: type, handler: Callable):
"""Register a handler for a custom step type.
Handler signature: (runner: DemoRunner, step, dt: float) -> None.
The handler must call runner._advance() when the step is complete.
"""
cls._step_handlers[step_type] = handler
def __init__(
self,
steps: list,
test_mode: bool = False,
on_complete: Callable | None = None,
speed: float | None = None,
speed_mode: int = 0,
delay_between_steps: float = 0.15,
**kwargs,
):
super().__init__(**kwargs)
self.name = "DemoRunner"
self._steps = list(steps)
self._test_mode = test_mode
self._on_complete = on_complete
# Speed: explicit override or preset
self._speed_mode = max(0, min(2, speed_mode))
if speed is not None:
self._speed = speed
# Find closest preset for display, or leave mode as-is
diffs = [abs(speed - p[1]) for p in _SPEED_PRESETS]
self._speed_mode = diffs.index(min(diffs))
else:
self._speed = _SPEED_PRESETS[self._speed_mode][1]
self._inter_step_delay = delay_between_steps
self._sim = InputSimulator()
self._cursor_pos = (0.0, 0.0)
self._current_step = 0
self._step_time = 0.0
self._paused = False
self._done = False
# Inter-step delay state
self._in_inter_delay = False
self._inter_delay_accum = 0.0
# Assertion tracking
self._failed = False
self._failures: list[str] = []
# Narration overlay
self._narration = ""
self._narration_time = 0.0
# Current action description for input visualization overlay
self._action_desc = ""
# Internal state for multi-phase steps
self._click_phase = 0 # 0=move, 1=press, 2=release
self._click_move_origin = (0.0, 0.0)
self._key_pressed = False
self._type_index = 0
self._type_accum = 0.0
# Clickable speed bar hit rects (populated each frame by draw())
self._speed_btn_rects: list[tuple[float, float, float, float]] = []
self._pause_btn_rect: tuple[float, float, float, float] = (0, 0, 0, 0)
# Signals
self.completed = Signal()
self.step_changed = Signal()
self.assertion_failed = Signal()
# ------------------------------------------------------------------ properties
@property
def current_step_index(self) -> int:
return self._current_step
@property
def total_steps(self) -> int:
return len(self._steps)
@property
def is_done(self) -> bool:
return self._done
@property
def failures(self) -> list[str]:
return list(self._failures)
# ------------------------------------------------------------------ lifecycle
def _set_speed_mode(self, mode: int):
"""Switch speed preset (0=slow, 1=fast, 2=instant)."""
self._speed_mode = max(0, min(2, mode))
self._speed = _SPEED_PRESETS[self._speed_mode][1]
def _check_speed_bar_click(self):
"""Check if a mouse click landed on a speed or pause button."""
from .input.enums import MouseButton
if not Input.is_mouse_button_just_pressed(MouseButton.LEFT):
return
mx, my = Input._mouse_pos
for i, (bx, by, bw, bh) in enumerate(self._speed_btn_rects):
if bx <= mx <= bx + bw and by <= my <= by + bh:
self._set_speed_mode(i)
return
px, py, pw, ph = self._pause_btn_rect
if px <= mx <= px + pw and py <= my <= py + ph:
self._paused = not self._paused
def _get_action_desc(self) -> str:
"""Return a human-readable description of the current step for the overlay."""
if self._current_step >= len(self._steps):
return ""
step = self._steps[self._current_step]
if isinstance(step, Click):
return f"Click ({int(step.x)}, {int(step.y)})"
if isinstance(step, TypeText):
typed = step.text[: self._type_index]
# Show only the current line's last few characters to keep overlay compact
last_line = typed.rsplit("\n", 1)[-1]
if len(last_line) > 20:
last_line = last_line[-20:]
return f"Type: {last_line}" if last_line else "Type: \u21b5"
if isinstance(step, PressKey):
name = _KEY_TO_NAME.get(step.key, Key(step.key).name if isinstance(step.key, int) else str(step.key))
return f"Key: {name}"
if isinstance(step, MoveTo):
return "Moving cursor..."
if isinstance(step, Assert):
return "Checking..."
if isinstance(step, Do):
return f"Doing: {step.message}" if step.message else "Doing..."
if isinstance(step, Wait):
return "..."
if isinstance(step, Scroll):
return f"Scroll {'down' if step.dy < 0 else 'up'}"
if isinstance(step, Narrate):
return ""
return ""
[docs]
def physics_process(self, dt: float):
# DemoRunner ticks in physics_process so that input simulation (PressKey, Click,
# TypeText) happens BEFORE the game node's process(), matching real GLFW/SDL input
# timing. This ensures is_action_just_pressed() sees the key on the same frame.
if self._done:
return
# Hotkey controls (only in interactive mode)
if not self._test_mode:
if Input.is_key_just_pressed(Key.SPACE):
self._paused = not self._paused
if Input.is_key_just_pressed(Key.ESCAPE):
self._finish()
return
if Input.is_key_just_pressed(Key.RIGHT):
self._skip_step()
return
# Clickable speed bar — check if mouse clicked on a speed button
self._check_speed_bar_click()
if self._paused:
return
# Tick narration
if self._narration_time > 0:
self._narration_time -= dt
if self._narration_time <= 0:
self._narration = ""
if self._current_step >= len(self._steps):
self._finish()
return
# Inter-step delay: natural pause between steps, scaled by speed
if self._in_inter_delay:
self._inter_delay_accum += dt
delay_scaled = self._inter_step_delay / max(self._speed, 0.01)
if self._inter_delay_accum >= delay_scaled:
self._in_inter_delay = False
self._inter_delay_accum = 0.0
else:
return
effective_dt = dt * self._speed
step = self._steps[self._current_step]
self._step_time += effective_dt
# Update action description for input visualization
self._action_desc = self._get_action_desc()
if isinstance(step, MoveTo):
self._process_move_to(step, effective_dt)
elif isinstance(step, Click):
self._process_click(step, effective_dt)
elif isinstance(step, TypeText):
self._process_type_text(step, effective_dt)
elif isinstance(step, PressKey):
self._process_press_key(step, effective_dt)
elif isinstance(step, Wait):
if self._step_time >= step.duration:
self._advance()
elif isinstance(step, Assert):
self._process_assert(step)
elif isinstance(step, Do):
self._process_do(step)
elif isinstance(step, Scroll):
self._process_scroll(step)
elif isinstance(step, Narrate):
self._process_narrate(step)
elif type(step) in self._step_handlers:
self._step_handlers[type(step)](self, step, effective_dt)
else:
log.warning("Unknown step type %s at step %d — skipping", type(step).__name__, self._current_step)
self._advance()
[docs]
def draw(self, renderer):
cx, cy = self._cursor_pos
# Draw cursor (yellow circle, red when clicking)
step = self._steps[self._current_step] if self._current_step < len(self._steps) else None
is_clicking = isinstance(step, Click) and self._click_phase >= 1
if is_clicking:
renderer.set_colour(255, 60, 60, 200)
else:
renderer.set_colour(255, 220, 50, 200)
renderer.draw_circle(cx, cy, 16, segments=16)
# Crosshair
renderer.set_colour(255, 255, 255, 150)
renderer.draw_line(cx - 24, cy, cx + 24, cy)
renderer.draw_line(cx, cy - 24, cx, cy + 24)
# Narration panel at bottom (fades out at end of display time)
if self._narration:
sw, sh = self._get_screen_size()
_FADE_DUR = 0.5
fade = min(self._narration_time / _FADE_DUR, 1.0) if self._narration_time < _FADE_DUR else 1.0
panel_h = 80
panel_y = sh - panel_h - 16
renderer.set_colour(0, 0, 0, int(200 * fade))
renderer.fill_rect(10, panel_y, sw - 20, panel_h)
renderer.set_colour(255, 255, 255, int(240 * fade))
renderer.draw_text(self._narration, (28, panel_y + 18), scale=2.0)
# --- Speed controls + play/pause (top-center, clickable) ---
sw, _ = self._get_screen_size()
bar_w = 400
bar_h = 48
bar_x = (sw - bar_w) / 2
bar_y = 8
renderer.set_colour(0, 0, 0, 140)
renderer.fill_rect(bar_x, bar_y, bar_w, bar_h)
x_off = bar_x + 12
self._speed_btn_rects.clear()
for i, (label, _mult) in enumerate(_SPEED_PRESETS):
btn_w = len(label) * 24 + 28
self._speed_btn_rects.append((x_off - 6, bar_y, btn_w, bar_h))
if i == self._speed_mode:
renderer.set_colour(255, 255, 255, 255)
else:
renderer.set_colour(120, 120, 120, 180)
renderer.draw_text(label, (x_off, 16), scale=1.6)
x_off += btn_w
# --- Play/Pause indicator (clickable) ---
pause_x = x_off - 6
pause_w = bar_x + bar_w - pause_x
self._pause_btn_rect = (pause_x, bar_y, pause_w, bar_h)
if self._paused:
renderer.set_colour(255, 165, 0, 255)
renderer.draw_text("PAUSED", (x_off, 16), scale=1.6)
else:
renderer.set_colour(100, 220, 100, 200)
renderer.draw_text(">", (x_off, 16), scale=1.6)
# Step counter in top-right
counter = f"{self._current_step + 1}/{len(self._steps)}"
renderer.set_colour(200, 200, 200, 180)
renderer.draw_text(counter, (sw - 160, 16), scale=1.6)
# --- Input visualization (below step counter) ---
if self._action_desc:
desc_w = min(len(self._action_desc) * 18 + 32, 480)
desc_x = sw - desc_w - 16
renderer.set_colour(0, 0, 0, 140)
renderer.fill_rect(desc_x, 60, desc_w, 44)
renderer.set_colour(180, 220, 255, 220)
renderer.draw_text(self._action_desc, (desc_x + 16, 68), scale=1.4)
# ------------------------------------------------------------------ step processors
def _process_move_to(self, step: MoveTo, dt: float):
t = min(self._step_time / max(step.duration, 0.001), 1.0)
t = _smooth_step(t)
ox, oy = self._click_move_origin
self._cursor_pos = (
ox + (step.x - ox) * t,
oy + (step.y - oy) * t,
)
self._sim.move_mouse(self._cursor_pos[0], self._cursor_pos[1])
if t >= 1.0:
self._cursor_pos = (step.x, step.y)
self._sim.move_mouse(step.x, step.y)
self._advance()
def _process_click(self, step: Click, dt: float):
if self._click_phase == 0:
# Move phase — lerp to target
move_dur = 0.3 / max(self._speed, 0.01)
t = min(self._step_time / max(move_dur, 0.001), 1.0)
t = _smooth_step(t)
ox, oy = self._click_move_origin
self._cursor_pos = (
ox + (step.x - ox) * t,
oy + (step.y - oy) * t,
)
self._sim.move_mouse(self._cursor_pos[0], self._cursor_pos[1])
if t >= 1.0:
self._cursor_pos = (step.x, step.y)
self._sim.move_mouse(step.x, step.y)
self._click_phase = 1
self._step_time = 0.0
elif self._click_phase == 1:
# Press — inject into Input singleton AND dispatch UI event
self._sim.press_mouse(step.button, (step.x, step.y))
self._dispatch_ui_mouse(step.button, True)
self._click_phase = 2
self._step_time = 0.0
elif self._click_phase == 2:
# Release after brief hold
if self._step_time >= 0.05:
self._sim.release_mouse(step.button)
self._dispatch_ui_mouse(step.button, False)
self._advance()
def _dispatch_ui_mouse(self, button: int, pressed: bool):
"""Dispatch a UI mouse event through the scene tree so widgets respond."""
if self._tree:
self._tree.ui_input(mouse_pos=self._cursor_pos, button=button, pressed=pressed)
def _process_type_text(self, step: TypeText, dt: float):
self._type_accum += dt
while self._type_accum >= step.delay_per_char and self._type_index < len(step.text):
ch = step.text[self._type_index]
if ch == "\n":
# Newline → dispatch Enter key press/release
if self._tree:
self._tree.ui_input(key="enter", pressed=True)
self._tree.ui_input(key="enter", pressed=False)
else:
# Dispatch character through the scene tree so focused widgets receive it
if self._tree:
self._tree.ui_input(char=ch)
# Also inject into Input singleton for non-UI consumers
key_code = ord(ch.upper()) if ch.isalpha() else ord(ch)
self._sim.tap_key(key_code)
self._type_index += 1
self._type_accum -= step.delay_per_char
if self._type_index >= len(step.text):
self._advance()
def _process_press_key(self, step: PressKey, dt: float):
if not self._key_pressed:
self._sim.press_key(step.key)
# Also dispatch through UI system so popups/focused widgets receive key events
if self._tree:
key_name = _KEY_TO_NAME.get(Key(step.key) if isinstance(step.key, int) else step.key, "")
if not key_name and isinstance(step.key, int):
try:
key_name = Key(step.key).name.lower()
except ValueError:
key_name = f"key_{step.key}"
if key_name:
self._tree.ui_input(key=key_name, pressed=True)
self._key_pressed = True
if self._step_time >= step.hold_duration:
self._sim.release_key(step.key)
if self._tree:
key_name = _KEY_TO_NAME.get(Key(step.key) if isinstance(step.key, int) else step.key, "")
if not key_name and isinstance(step.key, int):
try:
key_name = Key(step.key).name.lower()
except ValueError:
key_name = f"key_{step.key}"
if key_name:
self._tree.ui_input(key=key_name, pressed=False)
self._advance()
def _process_assert(self, step: Assert):
game_node = self.parent
try:
result = step.check_fn(game_node)
except Exception as exc:
import traceback
result = False
tb = traceback.format_exc()
step = Assert(check_fn=step.check_fn, message=f"{step.message} (raised {exc})\n{tb}")
if not result:
msg = step.message or f"Assertion failed at step {self._current_step}"
if step.actual_fn is not None:
try:
actual = step.actual_fn(game_node)
msg = f"{msg} (actual: {actual!r})"
except Exception:
pass
self._failed = True
self._failures.append(msg)
self.assertion_failed(msg)
if self._test_mode:
raise AssertionError(msg)
self._advance()
def _process_do(self, step: Do):
game_node = self.parent
try:
step.action(game_node)
except Exception as exc:
if self._test_mode:
raise
log.error("Do step failed at step %d: %s — %s", self._current_step, step.message, exc)
self._advance()
def _process_scroll(self, step: Scroll):
"""Move cursor to position and dispatch scroll events through UI."""
self._cursor_pos = (step.x, step.y)
self._sim.move_mouse(step.x, step.y)
if self._tree:
self._tree.ui_input(mouse_pos=(step.x, step.y), key="scroll_down" if step.dy < 0 else "scroll_up", pressed=True)
# Dispatch multiple scroll ticks for larger scroll amounts
ticks = max(1, int(abs(step.dy)))
key = "scroll_down" if step.dy < 0 else "scroll_up"
for _ in range(ticks - 1):
self._tree.ui_input(mouse_pos=(step.x, step.y), key=key, pressed=True)
self._advance()
def _process_narrate(self, step: Narrate):
if self._step_time == 0 or (self._step_time < 0.02):
# First frame of narration step — persist for 2x the step duration
self._narration = step.text
self._narration_time = step.duration * 2.0
if self._step_time >= step.duration:
self._advance()
# ------------------------------------------------------------------ navigation
def _advance(self):
"""Move to the next step, resetting per-step state."""
self._current_step += 1
self._step_time = 0.0
self._click_phase = 0
self._click_move_origin = self._cursor_pos
self._key_pressed = False
self._type_index = 0
self._type_accum = 0.0
self._action_desc = ""
self.step_changed(self._current_step)
if self._current_step >= len(self._steps):
self._finish()
elif self._inter_step_delay > 0:
# Enter inter-step delay so the demo has a natural pause
self._in_inter_delay = True
self._inter_delay_accum = 0.0
def _skip_step(self):
"""Skip current step immediately (interactive mode)."""
step = self._steps[self._current_step] if self._current_step < len(self._steps) else None
# Clean up in-flight input
if isinstance(step, PressKey) and self._key_pressed:
self._sim.release_key(step.key)
if isinstance(step, Click) and self._click_phase == 1:
self._sim.release_mouse(step.button)
self._advance()
def _finish(self):
"""Mark playback as done and fire completion."""
if self._done:
return
self._done = True
self.completed()
if self._on_complete:
self._on_complete()
# ------------------------------------------------------------------ class methods
[docs]
@classmethod
def run_headless(
cls,
scene: Node,
steps: list,
*,
speed: float = 50.0,
screen_size: tuple[int, int] = (800, 600),
max_frames: int = 20000,
delay_between_steps: float = 0.0,
) -> bool:
"""Run a demo headlessly and return True if all steps pass.
Creates a DemoRunner in test_mode, adds it to *scene*, and advances
frames via SceneRunner until completion or *max_frames* is reached.
"""
from .testing.scene_runner import SceneRunner as _SR
runner = cls(steps, test_mode=True, speed=speed, delay_between_steps=delay_between_steps)
scene.add_child(runner)
sr = _SR(screen_size=screen_size)
sr.load(scene)
for _ in range(max_frames):
sr.advance_frames(1)
if runner.is_done:
break
if not runner.is_done:
log.error("Stuck at step %d/%d", runner.current_step_index + 1, runner.total_steps)
if runner.failures:
for f in runner.failures:
log.error(" %s", f)
return False
if runner.failures:
for f in runner.failures:
log.error("FAIL: %s", f)
return False
log.info("OK: All %d steps passed", runner.total_steps)
return True
[docs]
@classmethod
def run_visual(
cls,
scene: Node,
steps: list,
*,
speed: float | None = None,
speed_mode: int = 0,
title: str = "Demo",
width: int = 800,
height: int = 600,
backend: str | None = None,
):
"""Run a demo visually with the Vulkan App.
Lazily imports ``simvx.graphics.App`` to keep core free of graphics deps.
"""
from simvx.graphics import App
if speed is not None:
runner = cls(steps, test_mode=False, speed=speed)
else:
runner = cls(steps, test_mode=False, speed_mode=speed_mode)
scene.add_child(runner)
App(title=title, width=width, height=height, backend=backend).run(scene)
# ------------------------------------------------------------------ internals
def _get_screen_size(self) -> tuple[float, float]:
if self._tree:
ss = self._tree.screen_size
return (float(ss[0]), float(ss[1])) if not isinstance(ss, tuple) else ss
return (800.0, 600.0)
# ============================================================================
# Utilities
# ============================================================================
def _smooth_step(t: float) -> float:
"""Hermite smoothstep for natural cursor movement."""
t = max(0.0, min(1.0, t))
return t * t * (3.0 - 2.0 * t)