Source code for simvx.graphics.app

"""Graphics application layer: engine wrapper with node tree support."""

import logging
import time
import weakref
from collections.abc import Callable
from typing import Any

from .engine import Engine
from .platform import AVAILABLE_BACKENDS

log = logging.getLogger(__name__)

__all__ = ["App", "AVAILABLE_BACKENDS"]

def _get_sdl3_type() -> type:
    """Return the Sdl3Backend class, or a dummy type if not importable."""
    try:
        from .platform._sdl3 import Sdl3Backend
        return Sdl3Backend
    except (ImportError, ModuleNotFoundError):
        return type(None)  # will never match isinstance()

[docs] class App: """Graphics application wrapper. Supports both raw callbacks and node tree scenes. Args: title: Window title string. width: Window width in pixels (must be >= 1). height: Window height in pixels (must be >= 1). backend: Windowing backend name. One of ``"glfw"`` (desktop), ``"sdl3"`` (touch/mobile), ``"qt"`` (embedding). ``None`` auto-detects. physics_fps: Fixed-timestep physics simulation rate (Hz). Independent of the display rate; the run loop drives physics via an accumulator so changing this never affects render cadence. target_fps: Maximum display frame rate. ``None`` (default) means uncapped (vsync-gated only). Mirrors Godot's ``Engine.max_fps``, Unity's ``Application.targetFrameRate``, and Bevy's ``FrameRateLimit::Limit``. Combine with ``vsync=False`` to make the cap dominant on high-refresh displays. visible: Open a visible window. Set ``False`` for headless tests. vsync: Enable vertical sync at present time. bg_colour: Background clear colour. ``"transparent"``, an RGBA tuple, or ``None`` to use the theme background. Usage with node tree (recommended):: app = App(title="My Game", width=1280, height=720) app.run(MyGameScene()) # Node subclass with ready()/process() Usage with raw callbacks:: app = App(title="My Game") app.run(update=my_update, render=my_render) After ``run()`` has created the window, ``window_title``, ``window_size``, and ``active_backend`` reflect the live state. """ # Weakref to the most recently constructed App, exposed via App.current(). # Lets non-Node helpers (audio code, utilities) reach the running app # without a circular dependency. Held as a weakref so test suites that # spin up many short-lived Apps don't leak them. _current_ref: weakref.ReferenceType[App] | None = None
[docs] @classmethod def current(cls) -> App | None: """Return the running App instance, or ``None`` if no App is alive. Prefer ``node.app`` from inside Node subclasses (after ``on_enter_tree``); use this from non-Node helpers like audio callbacks, utility modules, or tools that don't have a node handle. """ ref = cls._current_ref return ref() if ref is not None else None
def __init__( self, title: str = "SimVX", width: int = 1280, height: int = 720, backend: str | None = None, physics_fps: int = 60, target_fps: int | None = None, visible: bool = True, vsync: bool = True, bg_colour: tuple[float, float, float, float] | str | None = None, audio_sample_rate: int = 48000, audio_channels: int = 2, **_kwargs, ): if not isinstance(title, str): raise TypeError(f"App title must be a string, got {type(title).__name__}") if not isinstance(width, int) or not isinstance(height, int): raise TypeError("App width and height must be integers") if width < 1 or height < 1: raise ValueError(f"App width and height must be >= 1, got {width}x{height}") if target_fps is not None and target_fps < 1: raise ValueError(f"target_fps must be >= 1 or None, got {target_fps}") if backend is not None and backend not in AVAILABLE_BACKENDS: raise ValueError( f"Unknown backend {backend!r}. Allowed: {list(AVAILABLE_BACKENDS)}" ) if not isinstance(audio_sample_rate, int) or audio_sample_rate <= 0: raise ValueError( f"audio_sample_rate must be a positive int, got {audio_sample_rate!r}" ) if audio_channels not in (1, 2): raise ValueError( f"audio_channels must be 1 (mono) or 2 (stereo), got {audio_channels!r}" ) self.title = title self.width = width self.height = height self._backend_name = backend self._physics_fps = physics_fps self._target_fps = target_fps self._visible = visible self._vsync = vsync self._bg_colour = bg_colour self._audio_sample_rate = audio_sample_rate self._audio_channels = audio_channels self._engine: Engine | None = None # Per-run SubViewport offscreen render-target manager (graphics-side # driver for core.SubViewport render-to-texture). Created in setup(). self._sub_viewports: Any = None # Global slow-mo / hitstop / fast-forward multiplier applied to the # per-frame ``dt`` *before* it reaches ``tree.tick()`` and # ``tree.physics_tick()``. ``1.0`` = real-time; ``0.3`` = 30% # speed; ``0.0`` = paused (tick/physics still run, but with zero # dt). Audio pitching and coroutine pacing are intentionally NOT # affected by this knob: only the scene-tree dt is scaled. self._time_scale: float = 1.0 # Snapshot of per-frame telemetry from the most recent # ``run_headless`` call. Captured BEFORE the engine shuts down so # tests / harnesses can read renderer counters after the call # returns (the engine nulls its ``_renderer`` in shutdown). # Keys populated today: # ``draw2d_draw_count`` : Draw2DPass.last_frame_draw_count of the # final fully rendered frame. # ``gpu_phase_times`` : per-pass GPU timings (label → ms), # copied from ``Engine.gpu_phase_times``. # ``frames_rendered`` : total update() calls that produced a frame. self.last_telemetry: dict[str, Any] = {} type(self)._current_ref = weakref.ref(self)
[docs] def toggle_fullscreen(self) -> None: """Toggle fullscreen mode.""" if self._engine: self._engine.toggle_fullscreen()
[docs] @property def is_fullscreen(self) -> bool: return self._engine.is_fullscreen if self._engine else False
[docs] @property def engine(self) -> Engine | None: """Access the graphics engine (available after run() starts).""" return self._engine
[docs] @property def texture_manager(self): """The renderer's :class:`TextureManager` (available after run() starts). Convenience accessor: forwards to ``app.engine.texture_manager`` so scenes that upload runtime-generated pixels (e.g. rasterised text) don't need to reach into ``_engine``. """ return self._engine.texture_manager if self._engine is not None else None
[docs] @property def scene_adapter(self): """Scene adapter bridging SceneTree to the renderer (available after run() starts).""" return getattr(self, "_scene_adapter", None)
[docs] @property def active_backend(self) -> str | None: """Return the resolved backend name (e.g. ``"glfw"``), or ``None`` if the engine has not yet initialised a window.""" if self._engine is None: return None return self._engine._resolved_backend_name
@property def window_title(self) -> str: """Current window title. Writable once the window exists.""" if self._engine is not None: return self._engine.title return self.title
[docs] @window_title.setter def window_title(self, value: str) -> None: if not isinstance(value, str): raise TypeError(f"window_title must be a string, got {type(value).__name__}") self.title = value if self._engine is not None: self._engine.title = value window = getattr(self._engine, "_window", None) if window is not None and hasattr(window, "set_title"): window.set_title(value)
@property def window_size(self) -> tuple[int, int]: """Current window ``(width, height)``. Writable once the window exists.""" if self._engine is not None: return (self._engine.width, self._engine.height) return (self.width, self.height)
[docs] @window_size.setter def window_size(self, value: tuple[int, int]) -> None: if not isinstance(value, tuple | list) or len(value) != 2: raise TypeError("window_size must be a (width, height) tuple") w, h = int(value[0]), int(value[1]) if w < 1 or h < 1: raise ValueError(f"window_size must be >= 1x1, got {w}x{h}") self.width = w self.height = h if self._engine is not None: self._engine.window_size = (w, h)
[docs] @property def cursor_pos(self) -> tuple[float, float]: """Current cursor position in screen coordinates, ``(0.0, 0.0)`` before run().""" if self._engine is not None: return self._engine.cursor_pos return (0.0, 0.0)
[docs] @property def vsync(self) -> bool: """Whether vertical sync is currently enabled.""" if self._engine is not None: return self._engine.vsync return self._vsync
[docs] def set_vsync(self, value: bool) -> None: """Toggle vsync at runtime. Before ``run()`` this just stores the boot-time value. After ``run()``, the engine recreates the swapchain with the new present mode (FIFO when on, MAILBOX/IMMEDIATE when off). """ value = bool(value) self._vsync = value engine = getattr(self, "_engine", None) if engine is not None: engine.set_vsync(value)
@property def time_scale(self) -> float: """Global time-scale multiplier for the scene-tree tick. Applied to ``dt`` before delivery to ``tree.process()`` and ``tree.physics_process()``. ``1.0`` is real-time, ``0.3`` is 30% speed (slow-motion / hitstop), ``2.0`` is double speed, ``0.0`` is paused (still ticks, with zero dt). Default ``1.0``. Scope is intentionally narrow: only the scene-tree dt is scaled. Audio playback rate, coroutine pacing, and the wall-clock physics accumulator are NOT touched: slowing audio + coroutines requires a separate design pass. Read :attr:`simvx.core.SceneTree.now` for the scaled scene clock. """ return self._time_scale
[docs] @time_scale.setter def time_scale(self, value: float) -> None: v = float(value) if v < 0.0: raise ValueError(f"App.time_scale must be >= 0, got {v}") self._time_scale = v
[docs] @property def last_draw2d_draw_count(self) -> int | None: """``Draw2DPass.last_frame_draw_count`` from the most recent ``run_headless`` call's final rendered frame. ``None`` if no run has happened yet or the renderer reported no Draw2D pass for this scene. """ return self.last_telemetry.get("draw2d_draw_count")
[docs] def run( self, root_or_update: Any = None, *, update: Callable[[], None] | None = None, render: Callable[[object, tuple[int, int]], None] | None = None, ) -> None: """Run the graphics engine. Args: root_or_update: Either a Node (scene root) or a per-frame update callback. When a Node is passed, SceneTree and input are set up automatically. update: Per-frame callback (alternative to the positional arg). render: Custom render callback (cmd, extent). Only used in callback mode. """ from simvx.core import Node if isinstance(root_or_update, Node): self._run_with_tree(root_or_update) else: cb = root_or_update or update self._run_with_callbacks(cb, render)
def _init_audio_backend(self, tree) -> None: """Attach an audio backend to the scene tree (best-effort). `make_backend()` prefers the native ma_engine path (built via `simvx build-audio`) and falls back to the pure-Python mixer if the native extension isn't loaded, so games still produce sound on systems without a C compiler at install time. Sample rate and channel count come from the `App(audio_sample_rate=, audio_channels=)` constructor kwargs. """ try: from simvx.core.audio_backend import make_backend tree.install_audio_backend( make_backend( sample_rate=self._audio_sample_rate, nchannels=self._audio_channels, ) ) except (ImportError, OSError) as exc: # Justified: the audio_backend module (or its native dependency) may be # absent, or no audio device is present: the engine still runs silently. # ``make_backend`` already degrades to the null backend internally, so the # only escapes here are an import failure or a device-level OSError. A # deliberate ``AudioBackendUnavailable`` (SIMVX_ALLOW_LEGACY_AUDIO=0) is # NOT caught: it is a fail-loud opt-in and must propagate. log.warning("Audio backend unavailable; running without sound: %s", exc, exc_info=True) @staticmethod def _shutdown_audio(tree) -> None: """Stop the audio backend so its playback thread doesn't block process exit.""" backend = getattr(tree, "_audio_backend", None) if backend and hasattr(backend, "shutdown"): backend.shutdown() def _run_with_callbacks( self, update: Callable | None, render: Callable | None, ) -> None: """Run with raw per-frame update/render callbacks (no scene tree).""" self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=self._visible, vsync=self._vsync, target_fps=self._target_fps, ) if self._bg_colour == "transparent": self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): self._engine.clear_colour = list(self._bg_colour) else: from simvx.core.ui.theme import get_theme as _gt c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] self._engine.run(callback=update, render=render) def _run_with_tree(self, root_node: Any) -> None: """Run with a node tree: full engine integration.""" from simvx.core import Input, MouseButton, SceneTree from .draw2d import Draw2D from .input_adapter import ( char_callback, cursor_pos_callback, key_callback_with_ui, mouse_button_callback_with_ui, poll_gamepads, scroll_callback, set_ui_callbacks, ) from .renderer.sub_viewport import SubViewportManager from .scene_adapter import SceneAdapter self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=self._visible, vsync=self._vsync, target_fps=self._target_fps, ) # Resolve bg_colour: "transparent" → RGBA zero, tuple → fixed, None → theme-driven if self._bg_colour == "transparent": fixed_bg: list[float] | None = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): fixed_bg = list(self._bg_colour) else: fixed_bg = None # syncs from theme each frame if fixed_bg is not None: self._engine.clear_colour = fixed_bg else: # Set initial clear colour from theme immediately from simvx.core.ui.theme import get_theme as _init_gt c = _init_gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] last_theme_gen = -1 # track theme changes for bg sync tree = SceneTree(screen_size=(self.width, self.height)) self._init_audio_backend(tree) adapter: SceneAdapter | None = None physics_dt = 1.0 / self._physics_fps physics_accum = 0.0 last_time = 0.0 last_mouse_pos = (0.0, 0.0) # --- UI input routing callbacks --- def _ui_char(ch: str): tree.ui_input(char=ch) def _ui_key(key_name: str, pressed: bool): tree.ui_input(key=key_name, pressed=pressed) def _ui_mouse(button: MouseButton, pressed: bool): tree.ui_input(mouse_pos=Input._mouse_pos, button=button, pressed=pressed) def _ui_motion(pos: tuple[float, float]): nonlocal last_mouse_pos if pos != last_mouse_pos: last_mouse_pos = pos tree.ui_input(mouse_pos=pos, button=None, pressed=False) def _ui_touch(finger_id: int, action: int, x: float, y: float): tree.touch_input(finger_id, action, x, y) def setup(): nonlocal adapter, last_time # Wire UI routing callbacks. tree_propagate dispatches every input # event to the SceneTree's @on_input handlers via typed dispatch # tables (see SceneTree.propagate_input). set_ui_callbacks( char=_ui_char, key=_ui_key, mouse=_ui_mouse, motion=_ui_motion, scroll=_ui_key, # scroll events use same key routing touch=_ui_touch, tree_propagate=tree.propagate_input, ) # Wire input (with UI routing) self._engine.set_key_callback(key_callback_with_ui) self._engine.set_mouse_button_callback(mouse_button_callback_with_ui) self._engine.set_cursor_pos_callback(cursor_pos_callback) self._engine.set_scroll_callback(scroll_callback) self._engine.set_char_callback(char_callback) # Wire touch callback if backend supports it (SDL3) win = self._engine._window if hasattr(win, "set_touch_callback"): from .input_adapter import touch_callback win.set_touch_callback(touch_callback) # Wire clipboard for UI widgets (backend-specific) try: from simvx.core.ui.clipboard import set_backend as _set_clipboard win = self._engine._window if isinstance(win, _get_sdl3_type()): # SDL3 clipboard: works without a window handle import sdl3 as _sdl def _sdl3_paste(): val = _sdl.SDL_GetClipboardText() if val is None: return "" return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val) _set_clipboard(lambda text: _sdl.SDL_SetClipboardText(text.encode()), _sdl3_paste) else: # GLFW clipboard import glfw _win = win._window def _glfw_paste(): val = glfw.get_clipboard_string(_win) if val is None: return "" return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val) _set_clipboard(lambda text: glfw.set_clipboard_string(_win, text), _glfw_paste) except (ImportError, AttributeError) as exc: # Justified: the platform clipboard backend (sdl3/glfw) may be missing or # lack an expected symbol; UI widgets fall back to a no-op clipboard. Logged # at debug since this is a minor, non-fatal degradation. log.debug("Clipboard backend unavailable; using no-op clipboard: %s", exc, exc_info=True) # Store HiDPI content scale on theme (informational). # Do NOT scale Draw2D._base_height: the 2x framebuffer # provides sharper glyphs automatically. Scaling _base_height # would double text size in logical coordinates while widget # sizes stay at 1x, causing text to overflow containers. sx, _sy = self._engine.content_scale if sx > 1.01: from simvx.core.ui.theme import get_theme get_theme().ui_scale = sx get_theme()._sync_dicts() # Create renderer + scene adapter renderer = self._engine.renderer adapter = SceneAdapter(self._engine, renderer) self._scene_adapter = adapter # Exposed for editor game viewport rendering self._sub_viewports = SubViewportManager(self._engine, adapter) # Expose app and window handle on the tree for runtime access tree._app = self # type: ignore[attr-defined] tree._platform_window = getattr(self._engine._window, "_window", None) self._engine._scene_tree = tree # type: ignore[attr-defined] self._engine._scene_adapter = adapter # type: ignore[attr-defined] # for ReflectionProbePass # Set scene root (triggers ready() on all nodes) tree.set_root(root_node) last_time = time.perf_counter() def update(): """Game logic: runs BEFORE any GPU command recording.""" nonlocal physics_accum, last_time, last_theme_gen from simvx.core.debug import Debug p = Debug.profiler now = time.perf_counter() frame_dt = min(now - last_time, 0.1) # Cap at 100ms last_time = now p.begin("total") # Fixed-timestep physics. The wall-clock accumulator runs at # real-time; only the dt delivered to nodes is scaled by # ``time_scale`` so slow-motion does not stretch the physics # accumulator and pile up steps when the scale returns to 1.0. p.begin("physics") physics_accum += frame_dt scaled_physics_dt = physics_dt * self._time_scale while physics_accum >= physics_dt: tree.physics_tick(scaled_physics_dt) physics_accum -= physics_dt p.end("physics") # Poll gamepads poll_gamepads(self._engine._window) # Sync screen size with window size (UI coordinates match mouse positions) if self._engine._window: ws = self._engine._window.get_window_size() if tree.screen_size != (ws[0], ws[1]): tree.screen_size = (ws[0], ws[1]) # Re-query cursor position after resize so Input._mouse_pos # stays consistent with the new window dimensions. if hasattr(self._engine._window, "get_cursor_pos"): cx, cy = self._engine._window.get_cursor_pos() Input._mouse_pos = (cx, cy) # Sync background colour from theme (when not explicitly overridden) if fixed_bg is None: from simvx.core.ui.theme import get_theme as _gt from simvx.core.ui.theme import theme_generation as _tg gen = _tg() if gen != last_theme_gen: last_theme_gen = gen c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] # Game logic: scale dt by App.time_scale so slow-mo and # hitstop slow node ``on_process`` callbacks and SceneTree.now. p.begin("process") tree.tick(frame_dt * self._time_scale) p.end("process") # Mouse motion is now dispatched immediately from cursor_pos_callback # via _ui.motion callback, so no per-frame dispatch needed here. # 2D drawing pass: nodes submit Draw2D geometry p.begin("draw") Draw2D._reset() tree.render(Draw2D) p.end("draw") # Mouse picking on click if Input._keys_just_pressed.get("mouse_1"): tree.input_cast(Input._mouse_pos, button=MouseButton.LEFT) # Submit scene data for rendering (populates instances, materials, lights) p.begin("submit") renderer = self._engine.renderer renderer.begin_frame() adapter.submit_scene(tree) p.end("submit") p.end("total") p.end_frame() # Clear per-frame input state. ``_end_frame`` closes out the just- # press/release edges consumed by this tick; ``_new_frame`` re-arms # the typed buffers so any platform events that fire between now # and the next tick (gamepad polling, injected events, etc.) start # from a clean slate. Mirrors ``SceneRunner.advance_frames``. Input._end_frame() Input._new_frame() def pre_render_fn(cmd): """Offscreen passes (shadows): after game logic, before main render pass.""" renderer = self._engine.renderer # Render SubViewports into their offscreen textures first (on a # dedicated, waited one-time command buffer), so the main pass can # sample them this frame. That path runs begin_frame + submit_scene # on each sub-tree, clobbering the main scene's submission lists, # so re-submit the main scene afterwards. Only pay the re-submit # cost when at least one SubViewport actually rendered. resubmit = bool( self._sub_viewports is not None and adapter is not None and self._sub_viewports.render_all(tree) ) # Capture any new / requested reflection probes (also clobbers the # main scene's submission lists, so re-submit afterwards). if adapter is not None and renderer.capture_reflection_probes(adapter): resubmit = True if resubmit: renderer.begin_frame() adapter.submit_scene(tree) renderer._upload_transforms() renderer.pre_render(cmd) def render_fn(cmd, extent): """Main render pass: inside vkCmdBeginRenderPass.""" renderer = self._engine.renderer renderer.render(cmd) def _cleanup_sub_viewports(): """Release SubViewport offscreen targets while the device is alive.""" if self._sub_viewports is not None: self._sub_viewports.destroy() try: self._engine.run( callback=update, setup=setup, render=render_fn, pre_render=pre_render_fn, cleanup=_cleanup_sub_viewports, ) finally: self._sub_viewports = None self._shutdown_audio(tree)
[docs] def quit(self) -> None: """Request a clean shutdown of the running app. Safe to call from process/physics_process or input handlers. The main loop exits at the end of the current frame; audio and Vulkan resources are torn down in the usual finally-paths. Prefer this over ``sys.exit`` which can leave background threads (e.g. miniaudio) alive and stall process exit. """ if self._engine is not None: self._engine._running = False tree = getattr(self._engine, "_scene_tree", None) if tree is not None: tree.quit()
# ------------------------------------------------------------------ # Headless rendering (for tests and CI) # ------------------------------------------------------------------
[docs] def run_headless( self, root_node: Any, *, frames: int = 1, on_frame: Callable[[int, float], bool | None] | None = None, capture_frames: list[int] | None = (), capture_fn: Callable[[int], bool] | None = None, ) -> list: """Run the engine headlessly for *frames* frames and return captured pixels. Args: root_node: Scene root node. frames: Total number of frames to simulate. on_frame: Optional callback invoked with (frame_index, time) before each frame. Return False to stop early. capture_frames: Which frame indices to capture. Defaults to ``()``: no capture. Per-frame swapchain readback costs ~4 ms/frame (``vkQueueWaitIdle`` + ``vkDeviceWaitIdle`` + host copy), so the default is no-capture so that tests/CI/benchmarks measure real frame work, not host-side readback. Pass an explicit list of frame indices (e.g. ``[frames - 1]`` for the final frame) to opt in, or ``None`` to capture every frame. capture_fn: Dynamic capture predicate: called with frame index, captures if True. Takes precedence over *capture_frames* when provided. Returns: List of (H, W, 4) uint8 RGBA numpy arrays, one per captured frame (empty when the default ``capture_frames=()`` is used). """ import numpy as np from simvx.core import Input, SceneTree from .draw2d import Draw2D from .renderer.sub_viewport import SubViewportManager from .scene_adapter import SceneAdapter # Reset App-level state from any prior run so a second call starts # cleanly. Without this, ``_engine`` / ``_scene_adapter`` from a # previous invocation linger as stale handles after their owning # Engine has been shut down, and sticky-key state on the global # ``Input`` singleton can pre-trigger actions on the next scene. self._engine = None self._scene_adapter = None Input._reset() Draw2D._reset() self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=False, vsync=False, ) # Sync clear colour from theme (same as _run_with_tree) if self._bg_colour == "transparent": self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): self._engine.clear_colour = list(self._bg_colour) else: from simvx.core.ui.theme import get_theme as _gt c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] tree = SceneTree(screen_size=(self.width, self.height)) self._init_audio_backend(tree) adapter: SceneAdapter | None = None captured: list[np.ndarray] = [] frame_idx = 0 physics_dt = 1.0 / self._physics_fps # Telemetry snapshot captured immediately before the engine exits its # main loop, while the renderer still exists. ``Engine.run`` calls # ``shutdown()`` in its finally-block which nulls ``_renderer``, so # callers that want per-frame counts after ``run_headless`` returns # would otherwise have to scrape them inside ``on_frame``. Now they # can read ``app.last_telemetry`` / ``app.last_draw2d_draw_count`` # after the call. self.last_telemetry = {} def _capture_telemetry() -> None: r = self._engine._renderer if self._engine else None if r is None: return t = self.last_telemetry d2 = getattr(r, "_draw2d_pass", None) if d2 is not None: t["draw2d_draw_count"] = int(d2.last_frame_draw_count) t["gpu_phase_times"] = dict(getattr(self._engine, "gpu_phase_times", {}) or {}) t["frames_rendered"] = frame_idx def setup(): nonlocal adapter renderer = self._engine.renderer adapter = SceneAdapter(self._engine, renderer) self._scene_adapter = adapter # Exposed for editor game viewport rendering self._sub_viewports = SubViewportManager(self._engine, adapter) tree._app = self # type: ignore[attr-defined] self._engine._scene_tree = tree # type: ignore[attr-defined] self._engine._scene_adapter = adapter # type: ignore[attr-defined] # for ReflectionProbePass tree.set_root(root_node) # The engine loop is: update() → _draw_frame(). Frame N is drawn after # update N returns. capture_frame() reads the *last drawn* image, so we # capture frame N-1 at the start of update N. The final frame is captured # in the extra update call where frame_idx == frames. def update(): nonlocal frame_idx # Capture the frame drawn by the previous _draw_frame call if frame_idx > 0: prev = frame_idx - 1 should_capture = ( (capture_fn is not None and capture_fn(prev)) or (capture_fn is None and (capture_frames is None or prev in capture_frames)) ) if should_capture: captured.append(self._engine.renderer.capture_frame()) if frame_idx >= frames: _capture_telemetry() self._engine._running = False return if on_frame: if on_frame(frame_idx, frame_idx * physics_dt) is False: _capture_telemetry() self._engine._running = False return scaled_dt = physics_dt * self._time_scale tree.physics_tick(scaled_dt) tree.tick(scaled_dt) Draw2D._reset() tree.render(Draw2D) renderer = self._engine.renderer renderer.begin_frame() adapter.submit_scene(tree) Input._end_frame() Input._new_frame() frame_idx += 1 def pre_render_fn(cmd): renderer = self._engine.renderer resubmit = bool( self._sub_viewports is not None and adapter is not None and self._sub_viewports.render_all(tree) ) if adapter is not None and renderer.capture_reflection_probes(adapter): resubmit = True if resubmit: renderer.begin_frame() adapter.submit_scene(tree) renderer._upload_transforms() renderer.pre_render(cmd) def render_fn(cmd, extent): self._engine.renderer.render(cmd) def _cleanup_sub_viewports(): if self._sub_viewports is not None: self._sub_viewports.destroy() try: self._engine.run( callback=update, setup=setup, render=render_fn, pre_render=pre_render_fn, cleanup=_cleanup_sub_viewports, ) finally: self._sub_viewports = None self._shutdown_audio(tree) return captured
# ------------------------------------------------------------------ # Streaming (JPEG over WebSocket to browser) # ------------------------------------------------------------------
[docs] def run_streaming( self, root_node: Any, server: Any, ) -> None: """Run the engine headlessly and stream frames to browsers over WebSocket. Args: root_node: Scene root node. server: A ``StreamingServer`` instance (from ``simvx.graphics.streaming``). """ from simvx.core import Input, SceneTree from .draw2d import Draw2D from .scene_adapter import SceneAdapter self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=False, vsync=False, ) if self._bg_colour == "transparent": self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): self._engine.clear_colour = list(self._bg_colour) else: from simvx.core.ui.theme import get_theme as _gt c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] tree = SceneTree(screen_size=(self.width, self.height)) self._init_audio_backend(tree) adapter: SceneAdapter | None = None physics_dt = 1.0 / self._physics_fps physics_accum = 0.0 last_time = 0.0 last_mouse_pos = (0.0, 0.0) primary_finger: int | None = None # Start the streaming server server.start(self.width, self.height) def _process_input() -> None: """Drain queued input events from server and route to Input + UI.""" nonlocal last_mouse_pos, primary_finger from .input_adapter import _KEY_MAP for evt in server.drain_input(): etype = evt.get("type") if etype == "key": code = evt.get("code", 0) pressed = evt.get("pressed", False) Input._on_key(code, pressed) key_name = _KEY_MAP.get(code) if key_name: if pressed: if not Input._keys.get(key_name): Input._keys_just_pressed[key_name] = True Input._keys[key_name] = True else: Input._keys[key_name] = False Input._keys_just_released[key_name] = True tree.ui_input(key=key_name, pressed=pressed) elif etype == "char": codepoint = evt.get("codepoint", 0) tree.ui_input(char=chr(codepoint)) elif etype == "mouse": button = evt.get("button", 0) pressed = evt.get("pressed", False) Input._on_mouse_button(button, pressed) btn = f"mouse_{button + 1}" if pressed: if not Input._keys.get(btn): Input._keys_just_pressed[btn] = True Input._keys[btn] = True else: Input._keys[btn] = False Input._keys_just_released[btn] = True tree.ui_input(mouse_pos=Input._mouse_pos, button=MouseButton(button), pressed=pressed) elif etype == "mousemove": x, y = evt.get("x", 0.0), evt.get("y", 0.0) old = Input._mouse_pos Input._mouse_pos = (x, y) Input._mouse_delta = (x - old[0], y - old[1]) pos = (x, y) if pos != last_mouse_pos: last_mouse_pos = pos tree.ui_input(mouse_pos=pos, button=None, pressed=False) elif etype == "scroll": dx, dy = evt.get("dx", 0.0), evt.get("dy", 0.0) Input._scroll_delta = ( Input._scroll_delta[0] + dx, Input._scroll_delta[1] + dy, ) if dy > 0: tree.ui_input(key="scroll_up", pressed=True) elif dy < 0: tree.ui_input(key="scroll_down", pressed=True) elif etype == "touch": finger_id = evt.get("id", 0) action = evt.get("action", 0) x, y = evt.get("x", 0.0), evt.get("y", 0.0) pressure = evt.get("pressure", 1.0) Input._update_touch(finger_id, action, x, y, pressure) # Primary finger emulates mouse for UI if action == 0 and primary_finger is None: primary_finger = finger_id Input._mouse_pos = (x, y) Input._mouse_delta = (0.0, 0.0) tree.ui_input(mouse_pos=(x, y), button=None, pressed=False) tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=True) elif action == 1 and finger_id == primary_finger: primary_finger = None Input._mouse_pos = (x, y) tree.ui_input(mouse_pos=(x, y), button=None, pressed=False) tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=False) elif action == 2 and finger_id == primary_finger: old = Input._mouse_pos Input._mouse_pos = (x, y) Input._mouse_delta = (x - old[0], y - old[1]) tree.ui_input(mouse_pos=(x, y), button=None, pressed=False) def setup(): nonlocal adapter, last_time import time as _time renderer = self._engine.renderer adapter = SceneAdapter(self._engine, renderer) tree._app = self # type: ignore[attr-defined] self._engine._scene_tree = tree # type: ignore[attr-defined] self._engine._scene_adapter = adapter # type: ignore[attr-defined] # for ReflectionProbePass tree.set_root(root_node) last_time = _time.perf_counter() _frame_count = 0 def update(): nonlocal physics_accum, last_time, _frame_count # Capture the PREVIOUS frame (which _draw_frame() has now fully rendered). # Skip frame 0: no previous frame exists yet. if _frame_count > 0 and server.has_clients(): import vulkan as _vk try: pixels = self._engine.renderer.capture_frame() server.push_frame(pixels) except ( _vk.VkErrorOutOfDateKhr, _vk.VkErrorSurfaceLostKhr, RuntimeError, AttributeError, ) as exc: # Justified: a swapchain mid-resize/teardown (out-of-date / surface-lost), # or a renderer whose state is being torn down, makes a single frame # capture transiently fail: streaming simply skips this frame. Logged at # debug so it stays visible without spamming a per-frame hot path. log.debug("Frame capture for streaming skipped: %s", exc, exc_info=True) _frame_count += 1 import time as _time now = _time.perf_counter() frame_dt = min(now - last_time, 0.1) last_time = now # Process input from browser clients _process_input() # Fixed-timestep physics. Wall-clock accumulator is unscaled; # only the delivered dt is scaled by ``time_scale``. physics_accum += frame_dt scaled_physics_dt = physics_dt * self._time_scale while physics_accum >= physics_dt: tree.physics_tick(scaled_physics_dt) physics_accum -= physics_dt tree.tick(frame_dt * self._time_scale) Draw2D._reset() tree.render(Draw2D) renderer = self._engine.renderer renderer.begin_frame() adapter.submit_scene(tree) Input._end_frame() Input._new_frame() def pre_render_fn(cmd): self._engine.renderer.pre_render(cmd) def render_fn(cmd, extent): self._engine.renderer.render(cmd) try: self._engine.run( callback=update, setup=setup, render=render_fn, pre_render=pre_render_fn, ) finally: server.stop() self._shutdown_audio(tree)