"""Graphics application layer: engine wrapper with node tree support."""
import logging
import time
import weakref
from collections.abc import Callable
from typing import Any
from .engine import Engine
from .platform import AVAILABLE_BACKENDS
log = logging.getLogger(__name__)
__all__ = ["App", "AVAILABLE_BACKENDS"]
def _get_sdl3_type() -> type:
"""Return the Sdl3Backend class, or a dummy type if not importable."""
try:
from .platform._sdl3 import Sdl3Backend
return Sdl3Backend
except (ImportError, ModuleNotFoundError):
return type(None) # will never match isinstance()
[docs]
class App:
"""Graphics application wrapper. Supports both raw callbacks and node tree scenes.
Args:
title: Window title string.
width: Window width in pixels (must be >= 1).
height: Window height in pixels (must be >= 1).
backend: Windowing backend name. One of ``"glfw"`` (desktop),
``"sdl3"`` (touch/mobile), ``"qt"`` (embedding). ``None`` auto-detects.
physics_fps: Fixed-timestep physics simulation rate (Hz). Independent
of the display rate; the run loop drives physics via an
accumulator so changing this never affects render cadence.
target_fps: Maximum display frame rate. ``None`` (default) means
uncapped (vsync-gated only). Mirrors Godot's ``Engine.max_fps``,
Unity's ``Application.targetFrameRate``, and Bevy's
``FrameRateLimit::Limit``. Combine with ``vsync=False`` to make
the cap dominant on high-refresh displays.
visible: Open a visible window. Set ``False`` for headless tests.
vsync: Enable vertical sync at present time.
bg_colour: Background clear colour. ``"transparent"``, an RGBA tuple,
or ``None`` to use the theme background.
Usage with node tree (recommended)::
app = App(title="My Game", width=1280, height=720)
app.run(MyGameScene()) # Node subclass with ready()/process()
Usage with raw callbacks::
app = App(title="My Game")
app.run(update=my_update, render=my_render)
After ``run()`` has created the window, ``window_title``, ``window_size``,
and ``active_backend`` reflect the live state.
"""
# Weakref to the most recently constructed App, exposed via App.current().
# Lets non-Node helpers (audio code, utilities) reach the running app
# without a circular dependency. Held as a weakref so test suites that
# spin up many short-lived Apps don't leak them.
_current_ref: weakref.ReferenceType[App] | None = None
[docs]
@classmethod
def current(cls) -> App | None:
"""Return the running App instance, or ``None`` if no App is alive.
Prefer ``node.app`` from inside Node subclasses (after
``on_enter_tree``); use this from non-Node helpers like audio
callbacks, utility modules, or tools that don't have a node handle.
"""
ref = cls._current_ref
return ref() if ref is not None else None
def __init__(
self,
title: str = "SimVX",
width: int = 1280,
height: int = 720,
backend: str | None = None,
physics_fps: int = 60,
target_fps: int | None = None,
visible: bool = True,
vsync: bool = True,
bg_colour: tuple[float, float, float, float] | str | None = None,
audio_sample_rate: int = 48000,
audio_channels: int = 2,
**_kwargs,
):
if not isinstance(title, str):
raise TypeError(f"App title must be a string, got {type(title).__name__}")
if not isinstance(width, int) or not isinstance(height, int):
raise TypeError("App width and height must be integers")
if width < 1 or height < 1:
raise ValueError(f"App width and height must be >= 1, got {width}x{height}")
if target_fps is not None and target_fps < 1:
raise ValueError(f"target_fps must be >= 1 or None, got {target_fps}")
if backend is not None and backend not in AVAILABLE_BACKENDS:
raise ValueError(
f"Unknown backend {backend!r}. Allowed: {list(AVAILABLE_BACKENDS)}"
)
if not isinstance(audio_sample_rate, int) or audio_sample_rate <= 0:
raise ValueError(
f"audio_sample_rate must be a positive int, got {audio_sample_rate!r}"
)
if audio_channels not in (1, 2):
raise ValueError(
f"audio_channels must be 1 (mono) or 2 (stereo), got {audio_channels!r}"
)
self.title = title
self.width = width
self.height = height
self._backend_name = backend
self._physics_fps = physics_fps
self._target_fps = target_fps
self._visible = visible
self._vsync = vsync
self._bg_colour = bg_colour
self._audio_sample_rate = audio_sample_rate
self._audio_channels = audio_channels
self._engine: Engine | None = None
# Per-run SubViewport offscreen render-target manager (graphics-side
# driver for core.SubViewport render-to-texture). Created in setup().
self._sub_viewports: Any = None
# Global slow-mo / hitstop / fast-forward multiplier applied to the
# per-frame ``dt`` *before* it reaches ``tree.tick()`` and
# ``tree.physics_tick()``. ``1.0`` = real-time; ``0.3`` = 30%
# speed; ``0.0`` = paused (tick/physics still run, but with zero
# dt). Audio pitching and coroutine pacing are intentionally NOT
# affected by this knob: only the scene-tree dt is scaled.
self._time_scale: float = 1.0
# Snapshot of per-frame telemetry from the most recent
# ``run_headless`` call. Captured BEFORE the engine shuts down so
# tests / harnesses can read renderer counters after the call
# returns (the engine nulls its ``_renderer`` in shutdown).
# Keys populated today:
# ``draw2d_draw_count`` : Draw2DPass.last_frame_draw_count of the
# final fully rendered frame.
# ``gpu_phase_times`` : per-pass GPU timings (label → ms),
# copied from ``Engine.gpu_phase_times``.
# ``frames_rendered`` : total update() calls that produced a frame.
self.last_telemetry: dict[str, Any] = {}
type(self)._current_ref = weakref.ref(self)
[docs]
def toggle_fullscreen(self) -> None:
"""Toggle fullscreen mode."""
if self._engine:
self._engine.toggle_fullscreen()
[docs]
@property
def is_fullscreen(self) -> bool:
return self._engine.is_fullscreen if self._engine else False
[docs]
@property
def engine(self) -> Engine | None:
"""Access the graphics engine (available after run() starts)."""
return self._engine
[docs]
@property
def texture_manager(self):
"""The renderer's :class:`TextureManager` (available after run() starts).
Convenience accessor: forwards to ``app.engine.texture_manager`` so
scenes that upload runtime-generated pixels (e.g. rasterised text)
don't need to reach into ``_engine``.
"""
return self._engine.texture_manager if self._engine is not None else None
[docs]
@property
def scene_adapter(self):
"""Scene adapter bridging SceneTree to the renderer (available after run() starts)."""
return getattr(self, "_scene_adapter", None)
[docs]
@property
def active_backend(self) -> str | None:
"""Return the resolved backend name (e.g. ``"glfw"``), or ``None`` if
the engine has not yet initialised a window."""
if self._engine is None:
return None
return self._engine._resolved_backend_name
@property
def window_title(self) -> str:
"""Current window title. Writable once the window exists."""
if self._engine is not None:
return self._engine.title
return self.title
[docs]
@window_title.setter
def window_title(self, value: str) -> None:
if not isinstance(value, str):
raise TypeError(f"window_title must be a string, got {type(value).__name__}")
self.title = value
if self._engine is not None:
self._engine.title = value
window = getattr(self._engine, "_window", None)
if window is not None and hasattr(window, "set_title"):
window.set_title(value)
@property
def window_size(self) -> tuple[int, int]:
"""Current window ``(width, height)``. Writable once the window exists."""
if self._engine is not None:
return (self._engine.width, self._engine.height)
return (self.width, self.height)
[docs]
@window_size.setter
def window_size(self, value: tuple[int, int]) -> None:
if not isinstance(value, tuple | list) or len(value) != 2:
raise TypeError("window_size must be a (width, height) tuple")
w, h = int(value[0]), int(value[1])
if w < 1 or h < 1:
raise ValueError(f"window_size must be >= 1x1, got {w}x{h}")
self.width = w
self.height = h
if self._engine is not None:
self._engine.window_size = (w, h)
[docs]
@property
def cursor_pos(self) -> tuple[float, float]:
"""Current cursor position in screen coordinates, ``(0.0, 0.0)`` before run()."""
if self._engine is not None:
return self._engine.cursor_pos
return (0.0, 0.0)
[docs]
@property
def vsync(self) -> bool:
"""Whether vertical sync is currently enabled."""
if self._engine is not None:
return self._engine.vsync
return self._vsync
[docs]
def set_vsync(self, value: bool) -> None:
"""Toggle vsync at runtime.
Before ``run()`` this just stores the boot-time value. After
``run()``, the engine recreates the swapchain with the new
present mode (FIFO when on, MAILBOX/IMMEDIATE when off).
"""
value = bool(value)
self._vsync = value
engine = getattr(self, "_engine", None)
if engine is not None:
engine.set_vsync(value)
@property
def time_scale(self) -> float:
"""Global time-scale multiplier for the scene-tree tick.
Applied to ``dt`` before delivery to ``tree.process()`` and
``tree.physics_process()``. ``1.0`` is real-time, ``0.3`` is 30%
speed (slow-motion / hitstop), ``2.0`` is double speed, ``0.0`` is
paused (still ticks, with zero dt). Default ``1.0``.
Scope is intentionally narrow: only the scene-tree dt is scaled.
Audio playback rate, coroutine pacing, and the wall-clock physics
accumulator are NOT touched: slowing audio + coroutines requires a
separate design pass. Read :attr:`simvx.core.SceneTree.now` for
the scaled scene clock.
"""
return self._time_scale
[docs]
@time_scale.setter
def time_scale(self, value: float) -> None:
v = float(value)
if v < 0.0:
raise ValueError(f"App.time_scale must be >= 0, got {v}")
self._time_scale = v
[docs]
@property
def last_draw2d_draw_count(self) -> int | None:
"""``Draw2DPass.last_frame_draw_count`` from the most recent
``run_headless`` call's final rendered frame.
``None`` if no run has happened yet or the renderer reported no
Draw2D pass for this scene.
"""
return self.last_telemetry.get("draw2d_draw_count")
[docs]
def run(
self,
root_or_update: Any = None,
*,
update: Callable[[], None] | None = None,
render: Callable[[object, tuple[int, int]], None] | None = None,
) -> None:
"""Run the graphics engine.
Args:
root_or_update: Either a Node (scene root) or a per-frame update callback.
When a Node is passed, SceneTree and input are set up automatically.
update: Per-frame callback (alternative to the positional arg).
render: Custom render callback (cmd, extent). Only used in callback mode.
"""
from simvx.core import Node
if isinstance(root_or_update, Node):
self._run_with_tree(root_or_update)
else:
cb = root_or_update or update
self._run_with_callbacks(cb, render)
def _init_audio_backend(self, tree) -> None:
"""Attach an audio backend to the scene tree (best-effort).
`make_backend()` prefers the native ma_engine path (built via
`simvx build-audio`) and falls back to the pure-Python mixer if
the native extension isn't loaded, so games still produce sound
on systems without a C compiler at install time. Sample rate
and channel count come from the `App(audio_sample_rate=,
audio_channels=)` constructor kwargs.
"""
try:
from simvx.core.audio_backend import make_backend
tree.install_audio_backend(
make_backend(
sample_rate=self._audio_sample_rate,
nchannels=self._audio_channels,
)
)
except (ImportError, OSError) as exc:
# Justified: the audio_backend module (or its native dependency) may be
# absent, or no audio device is present: the engine still runs silently.
# ``make_backend`` already degrades to the null backend internally, so the
# only escapes here are an import failure or a device-level OSError. A
# deliberate ``AudioBackendUnavailable`` (SIMVX_ALLOW_LEGACY_AUDIO=0) is
# NOT caught: it is a fail-loud opt-in and must propagate.
log.warning("Audio backend unavailable; running without sound: %s", exc, exc_info=True)
@staticmethod
def _shutdown_audio(tree) -> None:
"""Stop the audio backend so its playback thread doesn't block process exit."""
backend = getattr(tree, "_audio_backend", None)
if backend and hasattr(backend, "shutdown"):
backend.shutdown()
def _run_with_callbacks(
self,
update: Callable | None,
render: Callable | None,
) -> None:
"""Run with raw per-frame update/render callbacks (no scene tree)."""
self._engine = Engine(
width=self.width,
height=self.height,
title=self.title,
backend=self._backend_name,
visible=self._visible,
vsync=self._vsync,
target_fps=self._target_fps,
)
if self._bg_colour == "transparent":
self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
self._engine.clear_colour = list(self._bg_colour)
else:
from simvx.core.ui.theme import get_theme as _gt
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
self._engine.run(callback=update, render=render)
def _run_with_tree(self, root_node: Any) -> None:
"""Run with a node tree: full engine integration."""
from simvx.core import Input, MouseButton, SceneTree
from .draw2d import Draw2D
from .input_adapter import (
char_callback,
cursor_pos_callback,
key_callback_with_ui,
mouse_button_callback_with_ui,
poll_gamepads,
scroll_callback,
set_ui_callbacks,
)
from .renderer.sub_viewport import SubViewportManager
from .scene_adapter import SceneAdapter
self._engine = Engine(
width=self.width,
height=self.height,
title=self.title,
backend=self._backend_name,
visible=self._visible,
vsync=self._vsync,
target_fps=self._target_fps,
)
# Resolve bg_colour: "transparent" → RGBA zero, tuple → fixed, None → theme-driven
if self._bg_colour == "transparent":
fixed_bg: list[float] | None = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
fixed_bg = list(self._bg_colour)
else:
fixed_bg = None # syncs from theme each frame
if fixed_bg is not None:
self._engine.clear_colour = fixed_bg
else:
# Set initial clear colour from theme immediately
from simvx.core.ui.theme import get_theme as _init_gt
c = _init_gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
last_theme_gen = -1 # track theme changes for bg sync
tree = SceneTree(screen_size=(self.width, self.height))
self._init_audio_backend(tree)
adapter: SceneAdapter | None = None
physics_dt = 1.0 / self._physics_fps
physics_accum = 0.0
last_time = 0.0
last_mouse_pos = (0.0, 0.0)
# --- UI input routing callbacks ---
def _ui_char(ch: str):
tree.ui_input(char=ch)
def _ui_key(key_name: str, pressed: bool):
tree.ui_input(key=key_name, pressed=pressed)
def _ui_mouse(button: MouseButton, pressed: bool):
tree.ui_input(mouse_pos=Input._mouse_pos, button=button, pressed=pressed)
def _ui_motion(pos: tuple[float, float]):
nonlocal last_mouse_pos
if pos != last_mouse_pos:
last_mouse_pos = pos
tree.ui_input(mouse_pos=pos, button=None, pressed=False)
def _ui_touch(finger_id: int, action: int, x: float, y: float):
tree.touch_input(finger_id, action, x, y)
def setup():
nonlocal adapter, last_time
# Wire UI routing callbacks. tree_propagate dispatches every input
# event to the SceneTree's @on_input handlers via typed dispatch
# tables (see SceneTree.propagate_input).
set_ui_callbacks(
char=_ui_char,
key=_ui_key,
mouse=_ui_mouse,
motion=_ui_motion,
scroll=_ui_key, # scroll events use same key routing
touch=_ui_touch,
tree_propagate=tree.propagate_input,
)
# Wire input (with UI routing)
self._engine.set_key_callback(key_callback_with_ui)
self._engine.set_mouse_button_callback(mouse_button_callback_with_ui)
self._engine.set_cursor_pos_callback(cursor_pos_callback)
self._engine.set_scroll_callback(scroll_callback)
self._engine.set_char_callback(char_callback)
# Wire touch callback if backend supports it (SDL3)
win = self._engine._window
if hasattr(win, "set_touch_callback"):
from .input_adapter import touch_callback
win.set_touch_callback(touch_callback)
# Wire clipboard for UI widgets (backend-specific)
try:
from simvx.core.ui.clipboard import set_backend as _set_clipboard
win = self._engine._window
if isinstance(win, _get_sdl3_type()):
# SDL3 clipboard: works without a window handle
import sdl3 as _sdl
def _sdl3_paste():
val = _sdl.SDL_GetClipboardText()
if val is None:
return ""
return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val)
_set_clipboard(lambda text: _sdl.SDL_SetClipboardText(text.encode()), _sdl3_paste)
else:
# GLFW clipboard
import glfw
_win = win._window
def _glfw_paste():
val = glfw.get_clipboard_string(_win)
if val is None:
return ""
return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val)
_set_clipboard(lambda text: glfw.set_clipboard_string(_win, text), _glfw_paste)
except (ImportError, AttributeError) as exc:
# Justified: the platform clipboard backend (sdl3/glfw) may be missing or
# lack an expected symbol; UI widgets fall back to a no-op clipboard. Logged
# at debug since this is a minor, non-fatal degradation.
log.debug("Clipboard backend unavailable; using no-op clipboard: %s", exc, exc_info=True)
# Store HiDPI content scale on theme (informational).
# Do NOT scale Draw2D._base_height: the 2x framebuffer
# provides sharper glyphs automatically. Scaling _base_height
# would double text size in logical coordinates while widget
# sizes stay at 1x, causing text to overflow containers.
sx, _sy = self._engine.content_scale
if sx > 1.01:
from simvx.core.ui.theme import get_theme
get_theme().ui_scale = sx
get_theme()._sync_dicts()
# Create renderer + scene adapter
renderer = self._engine.renderer
adapter = SceneAdapter(self._engine, renderer)
self._scene_adapter = adapter # Exposed for editor game viewport rendering
self._sub_viewports = SubViewportManager(self._engine, adapter)
# Expose app and window handle on the tree for runtime access
tree._app = self # type: ignore[attr-defined]
tree._platform_window = getattr(self._engine._window, "_window", None)
self._engine._scene_tree = tree # type: ignore[attr-defined]
self._engine._scene_adapter = adapter # type: ignore[attr-defined] # for ReflectionProbePass
# Set scene root (triggers ready() on all nodes)
tree.set_root(root_node)
last_time = time.perf_counter()
def update():
"""Game logic: runs BEFORE any GPU command recording."""
nonlocal physics_accum, last_time, last_theme_gen
from simvx.core.debug import Debug
p = Debug.profiler
now = time.perf_counter()
frame_dt = min(now - last_time, 0.1) # Cap at 100ms
last_time = now
p.begin("total")
# Fixed-timestep physics. The wall-clock accumulator runs at
# real-time; only the dt delivered to nodes is scaled by
# ``time_scale`` so slow-motion does not stretch the physics
# accumulator and pile up steps when the scale returns to 1.0.
p.begin("physics")
physics_accum += frame_dt
scaled_physics_dt = physics_dt * self._time_scale
while physics_accum >= physics_dt:
tree.physics_tick(scaled_physics_dt)
physics_accum -= physics_dt
p.end("physics")
# Poll gamepads
poll_gamepads(self._engine._window)
# Sync screen size with window size (UI coordinates match mouse positions)
if self._engine._window:
ws = self._engine._window.get_window_size()
if tree.screen_size != (ws[0], ws[1]):
tree.screen_size = (ws[0], ws[1])
# Re-query cursor position after resize so Input._mouse_pos
# stays consistent with the new window dimensions.
if hasattr(self._engine._window, "get_cursor_pos"):
cx, cy = self._engine._window.get_cursor_pos()
Input._mouse_pos = (cx, cy)
# Sync background colour from theme (when not explicitly overridden)
if fixed_bg is None:
from simvx.core.ui.theme import get_theme as _gt
from simvx.core.ui.theme import theme_generation as _tg
gen = _tg()
if gen != last_theme_gen:
last_theme_gen = gen
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
# Game logic: scale dt by App.time_scale so slow-mo and
# hitstop slow node ``on_process`` callbacks and SceneTree.now.
p.begin("process")
tree.tick(frame_dt * self._time_scale)
p.end("process")
# Mouse motion is now dispatched immediately from cursor_pos_callback
# via _ui.motion callback, so no per-frame dispatch needed here.
# 2D drawing pass: nodes submit Draw2D geometry
p.begin("draw")
Draw2D._reset()
tree.render(Draw2D)
p.end("draw")
# Mouse picking on click
if Input._keys_just_pressed.get("mouse_1"):
tree.input_cast(Input._mouse_pos, button=MouseButton.LEFT)
# Submit scene data for rendering (populates instances, materials, lights)
p.begin("submit")
renderer = self._engine.renderer
renderer.begin_frame()
adapter.submit_scene(tree)
p.end("submit")
p.end("total")
p.end_frame()
# Clear per-frame input state. ``_end_frame`` closes out the just-
# press/release edges consumed by this tick; ``_new_frame`` re-arms
# the typed buffers so any platform events that fire between now
# and the next tick (gamepad polling, injected events, etc.) start
# from a clean slate. Mirrors ``SceneRunner.advance_frames``.
Input._end_frame()
Input._new_frame()
def pre_render_fn(cmd):
"""Offscreen passes (shadows): after game logic, before main render pass."""
renderer = self._engine.renderer
# Render SubViewports into their offscreen textures first (on a
# dedicated, waited one-time command buffer), so the main pass can
# sample them this frame. That path runs begin_frame + submit_scene
# on each sub-tree, clobbering the main scene's submission lists,
# so re-submit the main scene afterwards. Only pay the re-submit
# cost when at least one SubViewport actually rendered.
resubmit = bool(
self._sub_viewports is not None and adapter is not None
and self._sub_viewports.render_all(tree)
)
# Capture any new / requested reflection probes (also clobbers the
# main scene's submission lists, so re-submit afterwards).
if adapter is not None and renderer.capture_reflection_probes(adapter):
resubmit = True
if resubmit:
renderer.begin_frame()
adapter.submit_scene(tree)
renderer._upload_transforms()
renderer.pre_render(cmd)
def render_fn(cmd, extent):
"""Main render pass: inside vkCmdBeginRenderPass."""
renderer = self._engine.renderer
renderer.render(cmd)
def _cleanup_sub_viewports():
"""Release SubViewport offscreen targets while the device is alive."""
if self._sub_viewports is not None:
self._sub_viewports.destroy()
try:
self._engine.run(
callback=update,
setup=setup,
render=render_fn,
pre_render=pre_render_fn,
cleanup=_cleanup_sub_viewports,
)
finally:
self._sub_viewports = None
self._shutdown_audio(tree)
[docs]
def quit(self) -> None:
"""Request a clean shutdown of the running app.
Safe to call from process/physics_process or input handlers. The main
loop exits at the end of the current frame; audio and Vulkan resources
are torn down in the usual finally-paths. Prefer this over ``sys.exit``
which can leave background threads (e.g. miniaudio) alive and stall
process exit.
"""
if self._engine is not None:
self._engine._running = False
tree = getattr(self._engine, "_scene_tree", None)
if tree is not None:
tree.quit()
# ------------------------------------------------------------------
# Headless rendering (for tests and CI)
# ------------------------------------------------------------------
[docs]
def run_headless(
self,
root_node: Any,
*,
frames: int = 1,
on_frame: Callable[[int, float], bool | None] | None = None,
capture_frames: list[int] | None = (),
capture_fn: Callable[[int], bool] | None = None,
) -> list:
"""Run the engine headlessly for *frames* frames and return captured pixels.
Args:
root_node: Scene root node.
frames: Total number of frames to simulate.
on_frame: Optional callback invoked with (frame_index, time) before each frame.
Return False to stop early.
capture_frames: Which frame indices to capture. Defaults to ``()``: no capture.
Per-frame swapchain readback costs ~4 ms/frame (``vkQueueWaitIdle`` +
``vkDeviceWaitIdle`` + host copy), so the default is no-capture so that
tests/CI/benchmarks measure real frame work, not host-side readback.
Pass an explicit list of frame indices (e.g. ``[frames - 1]`` for the
final frame) to opt in, or ``None`` to capture every frame.
capture_fn: Dynamic capture predicate: called with frame index, captures if True.
Takes precedence over *capture_frames* when provided.
Returns:
List of (H, W, 4) uint8 RGBA numpy arrays, one per captured frame
(empty when the default ``capture_frames=()`` is used).
"""
import numpy as np
from simvx.core import Input, SceneTree
from .draw2d import Draw2D
from .renderer.sub_viewport import SubViewportManager
from .scene_adapter import SceneAdapter
# Reset App-level state from any prior run so a second call starts
# cleanly. Without this, ``_engine`` / ``_scene_adapter`` from a
# previous invocation linger as stale handles after their owning
# Engine has been shut down, and sticky-key state on the global
# ``Input`` singleton can pre-trigger actions on the next scene.
self._engine = None
self._scene_adapter = None
Input._reset()
Draw2D._reset()
self._engine = Engine(
width=self.width,
height=self.height,
title=self.title,
backend=self._backend_name,
visible=False,
vsync=False,
)
# Sync clear colour from theme (same as _run_with_tree)
if self._bg_colour == "transparent":
self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
self._engine.clear_colour = list(self._bg_colour)
else:
from simvx.core.ui.theme import get_theme as _gt
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
tree = SceneTree(screen_size=(self.width, self.height))
self._init_audio_backend(tree)
adapter: SceneAdapter | None = None
captured: list[np.ndarray] = []
frame_idx = 0
physics_dt = 1.0 / self._physics_fps
# Telemetry snapshot captured immediately before the engine exits its
# main loop, while the renderer still exists. ``Engine.run`` calls
# ``shutdown()`` in its finally-block which nulls ``_renderer``, so
# callers that want per-frame counts after ``run_headless`` returns
# would otherwise have to scrape them inside ``on_frame``. Now they
# can read ``app.last_telemetry`` / ``app.last_draw2d_draw_count``
# after the call.
self.last_telemetry = {}
def _capture_telemetry() -> None:
r = self._engine._renderer if self._engine else None
if r is None:
return
t = self.last_telemetry
d2 = getattr(r, "_draw2d_pass", None)
if d2 is not None:
t["draw2d_draw_count"] = int(d2.last_frame_draw_count)
t["gpu_phase_times"] = dict(getattr(self._engine, "gpu_phase_times", {}) or {})
t["frames_rendered"] = frame_idx
def setup():
nonlocal adapter
renderer = self._engine.renderer
adapter = SceneAdapter(self._engine, renderer)
self._scene_adapter = adapter # Exposed for editor game viewport rendering
self._sub_viewports = SubViewportManager(self._engine, adapter)
tree._app = self # type: ignore[attr-defined]
self._engine._scene_tree = tree # type: ignore[attr-defined]
self._engine._scene_adapter = adapter # type: ignore[attr-defined] # for ReflectionProbePass
tree.set_root(root_node)
# The engine loop is: update() → _draw_frame(). Frame N is drawn after
# update N returns. capture_frame() reads the *last drawn* image, so we
# capture frame N-1 at the start of update N. The final frame is captured
# in the extra update call where frame_idx == frames.
def update():
nonlocal frame_idx
# Capture the frame drawn by the previous _draw_frame call
if frame_idx > 0:
prev = frame_idx - 1
should_capture = (
(capture_fn is not None and capture_fn(prev))
or (capture_fn is None and (capture_frames is None or prev in capture_frames))
)
if should_capture:
captured.append(self._engine.renderer.capture_frame())
if frame_idx >= frames:
_capture_telemetry()
self._engine._running = False
return
if on_frame:
if on_frame(frame_idx, frame_idx * physics_dt) is False:
_capture_telemetry()
self._engine._running = False
return
scaled_dt = physics_dt * self._time_scale
tree.physics_tick(scaled_dt)
tree.tick(scaled_dt)
Draw2D._reset()
tree.render(Draw2D)
renderer = self._engine.renderer
renderer.begin_frame()
adapter.submit_scene(tree)
Input._end_frame()
Input._new_frame()
frame_idx += 1
def pre_render_fn(cmd):
renderer = self._engine.renderer
resubmit = bool(
self._sub_viewports is not None and adapter is not None
and self._sub_viewports.render_all(tree)
)
if adapter is not None and renderer.capture_reflection_probes(adapter):
resubmit = True
if resubmit:
renderer.begin_frame()
adapter.submit_scene(tree)
renderer._upload_transforms()
renderer.pre_render(cmd)
def render_fn(cmd, extent):
self._engine.renderer.render(cmd)
def _cleanup_sub_viewports():
if self._sub_viewports is not None:
self._sub_viewports.destroy()
try:
self._engine.run(
callback=update,
setup=setup,
render=render_fn,
pre_render=pre_render_fn,
cleanup=_cleanup_sub_viewports,
)
finally:
self._sub_viewports = None
self._shutdown_audio(tree)
return captured
# ------------------------------------------------------------------
# Streaming (JPEG over WebSocket to browser)
# ------------------------------------------------------------------
[docs]
def run_streaming(
self,
root_node: Any,
server: Any,
) -> None:
"""Run the engine headlessly and stream frames to browsers over WebSocket.
Args:
root_node: Scene root node.
server: A ``StreamingServer`` instance (from ``simvx.graphics.streaming``).
"""
from simvx.core import Input, SceneTree
from .draw2d import Draw2D
from .scene_adapter import SceneAdapter
self._engine = Engine(
width=self.width, height=self.height, title=self.title,
backend=self._backend_name, visible=False, vsync=False,
)
if self._bg_colour == "transparent":
self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
self._engine.clear_colour = list(self._bg_colour)
else:
from simvx.core.ui.theme import get_theme as _gt
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
tree = SceneTree(screen_size=(self.width, self.height))
self._init_audio_backend(tree)
adapter: SceneAdapter | None = None
physics_dt = 1.0 / self._physics_fps
physics_accum = 0.0
last_time = 0.0
last_mouse_pos = (0.0, 0.0)
primary_finger: int | None = None
# Start the streaming server
server.start(self.width, self.height)
def _process_input() -> None:
"""Drain queued input events from server and route to Input + UI."""
nonlocal last_mouse_pos, primary_finger
from .input_adapter import _KEY_MAP
for evt in server.drain_input():
etype = evt.get("type")
if etype == "key":
code = evt.get("code", 0)
pressed = evt.get("pressed", False)
Input._on_key(code, pressed)
key_name = _KEY_MAP.get(code)
if key_name:
if pressed:
if not Input._keys.get(key_name):
Input._keys_just_pressed[key_name] = True
Input._keys[key_name] = True
else:
Input._keys[key_name] = False
Input._keys_just_released[key_name] = True
tree.ui_input(key=key_name, pressed=pressed)
elif etype == "char":
codepoint = evt.get("codepoint", 0)
tree.ui_input(char=chr(codepoint))
elif etype == "mouse":
button = evt.get("button", 0)
pressed = evt.get("pressed", False)
Input._on_mouse_button(button, pressed)
btn = f"mouse_{button + 1}"
if pressed:
if not Input._keys.get(btn):
Input._keys_just_pressed[btn] = True
Input._keys[btn] = True
else:
Input._keys[btn] = False
Input._keys_just_released[btn] = True
tree.ui_input(mouse_pos=Input._mouse_pos, button=MouseButton(button), pressed=pressed)
elif etype == "mousemove":
x, y = evt.get("x", 0.0), evt.get("y", 0.0)
old = Input._mouse_pos
Input._mouse_pos = (x, y)
Input._mouse_delta = (x - old[0], y - old[1])
pos = (x, y)
if pos != last_mouse_pos:
last_mouse_pos = pos
tree.ui_input(mouse_pos=pos, button=None, pressed=False)
elif etype == "scroll":
dx, dy = evt.get("dx", 0.0), evt.get("dy", 0.0)
Input._scroll_delta = (
Input._scroll_delta[0] + dx,
Input._scroll_delta[1] + dy,
)
if dy > 0:
tree.ui_input(key="scroll_up", pressed=True)
elif dy < 0:
tree.ui_input(key="scroll_down", pressed=True)
elif etype == "touch":
finger_id = evt.get("id", 0)
action = evt.get("action", 0)
x, y = evt.get("x", 0.0), evt.get("y", 0.0)
pressure = evt.get("pressure", 1.0)
Input._update_touch(finger_id, action, x, y, pressure)
# Primary finger emulates mouse for UI
if action == 0 and primary_finger is None:
primary_finger = finger_id
Input._mouse_pos = (x, y)
Input._mouse_delta = (0.0, 0.0)
tree.ui_input(mouse_pos=(x, y), button=None, pressed=False)
tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=True)
elif action == 1 and finger_id == primary_finger:
primary_finger = None
Input._mouse_pos = (x, y)
tree.ui_input(mouse_pos=(x, y), button=None, pressed=False)
tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=False)
elif action == 2 and finger_id == primary_finger:
old = Input._mouse_pos
Input._mouse_pos = (x, y)
Input._mouse_delta = (x - old[0], y - old[1])
tree.ui_input(mouse_pos=(x, y), button=None, pressed=False)
def setup():
nonlocal adapter, last_time
import time as _time
renderer = self._engine.renderer
adapter = SceneAdapter(self._engine, renderer)
tree._app = self # type: ignore[attr-defined]
self._engine._scene_tree = tree # type: ignore[attr-defined]
self._engine._scene_adapter = adapter # type: ignore[attr-defined] # for ReflectionProbePass
tree.set_root(root_node)
last_time = _time.perf_counter()
_frame_count = 0
def update():
nonlocal physics_accum, last_time, _frame_count
# Capture the PREVIOUS frame (which _draw_frame() has now fully rendered).
# Skip frame 0: no previous frame exists yet.
if _frame_count > 0 and server.has_clients():
import vulkan as _vk
try:
pixels = self._engine.renderer.capture_frame()
server.push_frame(pixels)
except (
_vk.VkErrorOutOfDateKhr,
_vk.VkErrorSurfaceLostKhr,
RuntimeError,
AttributeError,
) as exc:
# Justified: a swapchain mid-resize/teardown (out-of-date / surface-lost),
# or a renderer whose state is being torn down, makes a single frame
# capture transiently fail: streaming simply skips this frame. Logged at
# debug so it stays visible without spamming a per-frame hot path.
log.debug("Frame capture for streaming skipped: %s", exc, exc_info=True)
_frame_count += 1
import time as _time
now = _time.perf_counter()
frame_dt = min(now - last_time, 0.1)
last_time = now
# Process input from browser clients
_process_input()
# Fixed-timestep physics. Wall-clock accumulator is unscaled;
# only the delivered dt is scaled by ``time_scale``.
physics_accum += frame_dt
scaled_physics_dt = physics_dt * self._time_scale
while physics_accum >= physics_dt:
tree.physics_tick(scaled_physics_dt)
physics_accum -= physics_dt
tree.tick(frame_dt * self._time_scale)
Draw2D._reset()
tree.render(Draw2D)
renderer = self._engine.renderer
renderer.begin_frame()
adapter.submit_scene(tree)
Input._end_frame()
Input._new_frame()
def pre_render_fn(cmd):
self._engine.renderer.pre_render(cmd)
def render_fn(cmd, extent):
self._engine.renderer.render(cmd)
try:
self._engine.run(
callback=update, setup=setup,
render=render_fn, pre_render=pre_render_fn,
)
finally:
server.stop()
self._shutdown_audio(tree)