Source code for simvx.graphics.app

"""Graphics application layer — engine wrapper with node tree support."""


from __future__ import annotations

import logging
import time
from collections.abc import Callable
from typing import Any

from .engine import Engine
from .platform import AVAILABLE_BACKENDS

log = logging.getLogger(__name__)

__all__ = ["App", "AVAILABLE_BACKENDS"]


def _get_sdl3_type() -> type:
    """Return the Sdl3Backend class, or a dummy type if not importable."""
    try:
        from .platform._sdl3 import Sdl3Backend
        return Sdl3Backend
    except (ImportError, ModuleNotFoundError):
        return type(None)  # will never match isinstance()


[docs] class App: """Graphics application wrapper. Supports both raw callbacks and node tree scenes. Args: title: Window title string. width: Window width in pixels (must be >= 1). height: Window height in pixels (must be >= 1). backend: Windowing backend name. One of ``"glfw"`` (desktop), ``"sdl3"`` (touch/mobile), ``"qt"`` (embedding). ``None`` auto-detects. physics_fps: Physics simulation rate. visible: Open a visible window. Set ``False`` for headless tests. vsync: Enable vertical sync at present time. bg_colour: Background clear colour. ``"transparent"``, an RGBA tuple, or ``None`` to use the theme background. Usage with node tree (recommended):: app = App(title="My Game", width=1280, height=720) app.run(MyGameScene()) # Node subclass with ready()/process() Usage with raw callbacks:: app = App(title="My Game") app.run(update=my_update, render=my_render) After ``run()`` has created the window, ``window_title``, ``window_size``, and ``active_backend`` reflect the live state. """ def __init__( self, title: str = "SimVX", width: int = 1280, height: int = 720, backend: str | None = None, physics_fps: int = 60, visible: bool = True, vsync: bool = True, bg_colour: tuple[float, float, float, float] | str | None = None, **_kwargs, ): if not isinstance(title, str): raise TypeError(f"App title must be a string, got {type(title).__name__}") if not isinstance(width, int) or not isinstance(height, int): raise TypeError("App width and height must be integers") if width < 1 or height < 1: raise ValueError(f"App width and height must be >= 1, got {width}x{height}") if backend is not None and backend not in AVAILABLE_BACKENDS: raise ValueError( f"Unknown backend {backend!r}. Allowed: {list(AVAILABLE_BACKENDS)}" ) self.title = title self.width = width self.height = height self._backend_name = backend self._physics_fps = physics_fps self._visible = visible self._vsync = vsync self._bg_colour = bg_colour self._engine: Engine | None = None
[docs] def toggle_fullscreen(self) -> None: """Toggle fullscreen mode.""" if self._engine: self._engine.toggle_fullscreen()
@property def is_fullscreen(self) -> bool: return self._engine.is_fullscreen if self._engine else False @property def engine(self) -> Engine | None: """Access the graphics engine (available after run() starts).""" return self._engine @property def active_backend(self) -> str | None: """Return the resolved backend name (e.g. ``"glfw"``), or ``None`` if the engine has not yet initialised a window.""" if self._engine is None: return None return self._engine._resolved_backend_name @property def window_title(self) -> str: """Current window title. Writable once the window exists.""" if self._engine is not None: return self._engine.title return self.title @window_title.setter def window_title(self, value: str) -> None: if not isinstance(value, str): raise TypeError(f"window_title must be a string, got {type(value).__name__}") self.title = value if self._engine is not None: self._engine.title = value window = getattr(self._engine, "_window", None) if window is not None and hasattr(window, "set_title"): window.set_title(value) @property def window_size(self) -> tuple[int, int]: """Current window ``(width, height)``. Writable once the window exists.""" if self._engine is not None: return (self._engine.width, self._engine.height) return (self.width, self.height) @window_size.setter def window_size(self, value: tuple[int, int]) -> None: if not isinstance(value, tuple | list) or len(value) != 2: raise TypeError("window_size must be a (width, height) tuple") w, h = int(value[0]), int(value[1]) if w < 1 or h < 1: raise ValueError(f"window_size must be >= 1x1, got {w}x{h}") self.width = w self.height = h if self._engine is not None: self._engine.set_window_size(w, h)
[docs] def run( self, root_or_update: Any = None, *, update: Callable[[], None] | None = None, render: Callable[[object, tuple[int, int]], None] | None = None, ) -> None: """Run the graphics engine. Args: root_or_update: Either a Node (scene root) or a legacy update callback. When a Node is passed, SceneTree and input are set up automatically. update: Legacy per-frame callback (alternative to positional arg). render: Custom render callback (cmd, extent). Only used in callback mode. """ from simvx.core import Node if isinstance(root_or_update, Node): self._run_with_tree(root_or_update) else: cb = root_or_update or update self._run_with_callbacks(cb, render)
@staticmethod def _init_audio_backend(tree) -> None: """Attach a MiniaudioBackend to the scene tree (best-effort).""" try: from simvx.core.audio_backend import MiniaudioBackend tree._audio_backend = MiniaudioBackend() # type: ignore[attr-defined] except Exception: pass # No audio — miniaudio unavailable or no audio device @staticmethod def _shutdown_audio(tree) -> None: """Stop the audio backend so its playback thread doesn't block process exit.""" backend = getattr(tree, "_audio_backend", None) if backend and hasattr(backend, "shutdown"): backend.shutdown() def _run_with_callbacks( self, update: Callable | None, render: Callable | None, ) -> None: """Run with raw update/render callbacks (original behavior).""" self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=self._visible, vsync=self._vsync, ) if self._bg_colour == "transparent": self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): self._engine.clear_colour = list(self._bg_colour) else: from simvx.core.ui.theme import get_theme as _gt c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] self._engine.run(callback=update, render=render) def _run_with_tree(self, root_node: Any) -> None: """Run with a node tree — full engine integration.""" from simvx.core import Input, SceneTree from .draw2d import Draw2D from .input_adapter import ( char_callback, cursor_pos_callback, key_callback_with_ui, mouse_button_callback_with_ui, poll_gamepads, scroll_callback, set_ui_callbacks, ) from .scene_adapter import SceneAdapter self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=self._visible, vsync=self._vsync, ) # Resolve bg_colour: "transparent" → RGBA zero, tuple → fixed, None → theme-driven if self._bg_colour == "transparent": fixed_bg: list[float] | None = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): fixed_bg = list(self._bg_colour) else: fixed_bg = None # syncs from theme each frame if fixed_bg is not None: self._engine.clear_colour = fixed_bg else: # Set initial clear colour from theme immediately from simvx.core.ui.theme import get_theme as _init_gt c = _init_gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] last_theme_gen = -1 # track theme changes for bg sync tree = SceneTree(screen_size=(self.width, self.height)) self._init_audio_backend(tree) adapter: SceneAdapter | None = None physics_dt = 1.0 / self._physics_fps physics_accum = 0.0 last_time = 0.0 last_mouse_pos = (0.0, 0.0) # --- UI input routing callbacks --- def _ui_char(ch: str): tree.ui_input(char=ch) def _ui_key(key_name: str, pressed: bool): tree.ui_input(key=key_name, pressed=pressed) def _ui_mouse(button: int, pressed: bool): tree.ui_input(mouse_pos=Input._mouse_pos, button=button, pressed=pressed) def _ui_motion(pos: tuple[float, float]): nonlocal last_mouse_pos if pos != last_mouse_pos: last_mouse_pos = pos tree.ui_input(mouse_pos=pos, button=0, pressed=False) def _ui_touch(finger_id: int, action: int, x: float, y: float): tree.touch_input(finger_id, action, x, y) def setup(): nonlocal adapter, last_time # Wire UI routing callbacks set_ui_callbacks( char=_ui_char, key=_ui_key, mouse=_ui_mouse, motion=_ui_motion, scroll=_ui_key, # scroll events use same key routing touch=_ui_touch, ) # Wire input (with UI routing) self._engine.set_key_callback(key_callback_with_ui) self._engine.set_mouse_button_callback(mouse_button_callback_with_ui) self._engine.set_cursor_pos_callback(cursor_pos_callback) self._engine.set_scroll_callback(scroll_callback) self._engine.set_char_callback(char_callback) # Wire touch callback if backend supports it (SDL3) win = self._engine._window if hasattr(win, "set_touch_callback"): from .input_adapter import touch_callback win.set_touch_callback(touch_callback) # Wire clipboard for UI widgets (backend-specific) try: from simvx.core.ui.clipboard import set_backend as _set_clipboard win = self._engine._window if isinstance(win, _get_sdl3_type()): # SDL3 clipboard — works without a window handle import sdl3 as _sdl def _sdl3_paste(): val = _sdl.SDL_GetClipboardText() if val is None: return "" return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val) _set_clipboard(lambda text: _sdl.SDL_SetClipboardText(text.encode()), _sdl3_paste) else: # GLFW clipboard import glfw _win = win._window def _glfw_paste(): val = glfw.get_clipboard_string(_win) if val is None: return "" return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val) _set_clipboard(lambda text: glfw.set_clipboard_string(_win, text), _glfw_paste) except Exception: pass # Store HiDPI content scale on theme (informational). # Do NOT scale Draw2D._base_height — the 2x framebuffer # provides sharper glyphs automatically. Scaling _base_height # would double text size in logical coordinates while widget # sizes stay at 1x, causing text to overflow containers. sx, _sy = self._engine._content_scale if sx > 1.01: from simvx.core.ui.theme import get_theme get_theme().ui_scale = sx get_theme()._sync_dicts() # Create renderer + scene adapter renderer = self._engine.renderer adapter = SceneAdapter(self._engine, renderer) # Expose app and window handle on the tree for runtime access tree._app = self # type: ignore[attr-defined] tree._platform_window = getattr(self._engine._window, "_window", None) self._engine._scene_tree = tree # type: ignore[attr-defined] # Set scene root (triggers ready() on all nodes) tree.set_root(root_node) last_time = time.perf_counter() def update(): """Game logic — runs BEFORE any GPU command recording.""" nonlocal physics_accum, last_time, last_theme_gen from simvx.core.debug import Debug p = Debug.profiler now = time.perf_counter() frame_dt = min(now - last_time, 0.1) # Cap at 100ms last_time = now p.begin("total") # Fixed-timestep physics p.begin("physics") physics_accum += frame_dt while physics_accum >= physics_dt: tree.physics_process(physics_dt) physics_accum -= physics_dt p.end("physics") # Poll gamepads poll_gamepads(self._engine._window) # Sync screen size with window size (UI coordinates match mouse positions) if self._engine._window: ws = self._engine._window.get_window_size() if tree.screen_size != (ws[0], ws[1]): tree.screen_size = (ws[0], ws[1]) # Re-query cursor position after resize so Input._mouse_pos # stays consistent with the new window dimensions. if hasattr(self._engine._window, "get_cursor_pos"): cx, cy = self._engine._window.get_cursor_pos() Input._mouse_pos = (cx, cy) # Sync background colour from theme (when not explicitly overridden) if fixed_bg is None: from simvx.core.ui.theme import get_theme as _gt, theme_generation as _tg gen = _tg() if gen != last_theme_gen: last_theme_gen = gen c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] # Game logic p.begin("process") tree.process(frame_dt) p.end("process") # Mouse motion is now dispatched immediately from cursor_pos_callback # via _ui.motion callback, so no per-frame dispatch needed here. # 2D drawing pass — nodes submit Draw2D geometry p.begin("draw") Draw2D._reset() tree.draw(Draw2D) p.end("draw") # Mouse picking on click if Input._keys_just_pressed.get("mouse_1"): tree.input_cast(Input._mouse_pos, button=1) # Submit scene data for rendering (populates instances, materials, lights) p.begin("submit") renderer = self._engine.renderer renderer.begin_frame() adapter.submit_scene(tree) p.end("submit") p.end("total") p.end_frame() # Clear per-frame input state Input._end_frame() def pre_render_fn(cmd): """Offscreen passes (shadows) — after game logic, before main render pass.""" renderer = self._engine.renderer renderer.pre_render(cmd) def render_fn(cmd, extent): """Main render pass — inside vkCmdBeginRenderPass.""" renderer = self._engine.renderer renderer.render(cmd) self._engine.run( callback=update, setup=setup, render=render_fn, pre_render=pre_render_fn, ) self._shutdown_audio(tree) # ------------------------------------------------------------------ # Headless rendering (for tests and CI) # ------------------------------------------------------------------
[docs] def run_headless( self, root_node: Any, *, frames: int = 1, on_frame: Callable[[int, float], bool | None] | None = None, capture_frames: list[int] | None = None, capture_fn: Callable[[int], bool] | None = None, ) -> list: """Run the engine headlessly for *frames* frames and return captured pixels. Args: root_node: Scene root node. frames: Total number of frames to simulate. on_frame: Optional callback invoked with (frame_index, time) before each frame. Return False to stop early. capture_frames: Which frame indices to capture (None = capture all). capture_fn: Dynamic capture predicate — called with frame index, captures if True. Takes precedence over *capture_frames* when provided. Returns: List of (H, W, 4) uint8 RGBA numpy arrays. """ import numpy as np from simvx.core import Input, SceneTree from .draw2d import Draw2D from .scene_adapter import SceneAdapter self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=False, vsync=False, ) # Sync clear colour from theme (same as _run_with_tree) if self._bg_colour == "transparent": self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): self._engine.clear_colour = list(self._bg_colour) else: from simvx.core.ui.theme import get_theme as _gt c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] tree = SceneTree(screen_size=(self.width, self.height)) self._init_audio_backend(tree) adapter: SceneAdapter | None = None captured: list[np.ndarray] = [] frame_idx = 0 physics_dt = 1.0 / self._physics_fps def setup(): nonlocal adapter renderer = self._engine.renderer adapter = SceneAdapter(self._engine, renderer) tree._app = self # type: ignore[attr-defined] self._engine._scene_tree = tree # type: ignore[attr-defined] tree.set_root(root_node) # The engine loop is: update() → _draw_frame(). Frame N is drawn after # update N returns. capture_frame() reads the *last drawn* image, so we # capture frame N-1 at the start of update N. The final frame is captured # in the extra update call where frame_idx == frames. def update(): nonlocal frame_idx # Capture the frame drawn by the previous _draw_frame call if frame_idx > 0: prev = frame_idx - 1 should_capture = ( (capture_fn is not None and capture_fn(prev)) or (capture_fn is None and (capture_frames is None or prev in capture_frames)) ) if should_capture: captured.append(self._engine.renderer.capture_frame()) if frame_idx >= frames: self._engine._running = False return if on_frame: if on_frame(frame_idx, frame_idx * physics_dt) is False: self._engine._running = False return tree.physics_process(physics_dt) tree.process(physics_dt) Draw2D._reset() tree.draw(Draw2D) renderer = self._engine.renderer renderer.begin_frame() adapter.submit_scene(tree) Input._end_frame() frame_idx += 1 def pre_render_fn(cmd): self._engine.renderer.pre_render(cmd) def render_fn(cmd, extent): self._engine.renderer.render(cmd) self._engine.run( callback=update, setup=setup, render=render_fn, pre_render=pre_render_fn, ) self._shutdown_audio(tree) return captured
# ------------------------------------------------------------------ # Streaming (WebRTC to browser) # ------------------------------------------------------------------
[docs] def run_streaming( self, root_node: Any, server: Any, ) -> None: """Run the engine headlessly and stream frames to browsers via WebRTC. Args: root_node: Scene root node. server: A ``StreamingServer`` instance (from ``simvx.graphics.streaming``). """ from simvx.core import Input, SceneTree from .draw2d import Draw2D from .scene_adapter import SceneAdapter self._engine = Engine( width=self.width, height=self.height, title=self.title, backend=self._backend_name, visible=False, vsync=False, ) if self._bg_colour == "transparent": self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0] elif isinstance(self._bg_colour, tuple | list): self._engine.clear_colour = list(self._bg_colour) else: from simvx.core.ui.theme import get_theme as _gt c = _gt().bg_black self._engine.clear_colour = [c[0], c[1], c[2], c[3]] tree = SceneTree(screen_size=(self.width, self.height)) self._init_audio_backend(tree) adapter: SceneAdapter | None = None physics_dt = 1.0 / self._physics_fps physics_accum = 0.0 last_time = 0.0 last_mouse_pos = (0.0, 0.0) primary_finger: int | None = None # Start the streaming server server.start(self.width, self.height) def _process_input() -> None: """Drain queued input events from server and route to Input + UI.""" nonlocal last_mouse_pos, primary_finger from .input_adapter import _KEY_MAP for evt in server.drain_input(): etype = evt.get("type") if etype == "key": code = evt.get("code", 0) pressed = evt.get("pressed", False) Input._on_key(code, pressed) key_name = _KEY_MAP.get(code) if key_name: if pressed: if not Input._keys.get(key_name): Input._keys_just_pressed[key_name] = True Input._keys[key_name] = True else: Input._keys[key_name] = False Input._keys_just_released[key_name] = True tree.ui_input(key=key_name, pressed=pressed) elif etype == "char": codepoint = evt.get("codepoint", 0) tree.ui_input(char=chr(codepoint)) elif etype == "mouse": button = evt.get("button", 0) pressed = evt.get("pressed", False) Input._on_mouse_button(button, pressed) btn = f"mouse_{button + 1}" if pressed: if not Input._keys.get(btn): Input._keys_just_pressed[btn] = True Input._keys[btn] = True else: Input._keys[btn] = False Input._keys_just_released[btn] = True tree.ui_input(mouse_pos=Input._mouse_pos, button=button + 1, pressed=pressed) elif etype == "mousemove": x, y = evt.get("x", 0.0), evt.get("y", 0.0) old = Input._mouse_pos Input._mouse_pos = (x, y) Input._mouse_delta = (x - old[0], y - old[1]) pos = (x, y) if pos != last_mouse_pos: last_mouse_pos = pos tree.ui_input(mouse_pos=pos, button=0, pressed=False) elif etype == "scroll": dx, dy = evt.get("dx", 0.0), evt.get("dy", 0.0) Input._scroll_delta = ( Input._scroll_delta[0] + dx, Input._scroll_delta[1] + dy, ) if dy > 0: tree.ui_input(key="scroll_up", pressed=True) elif dy < 0: tree.ui_input(key="scroll_down", pressed=True) elif etype == "touch": finger_id = evt.get("id", 0) action = evt.get("action", 0) x, y = evt.get("x", 0.0), evt.get("y", 0.0) pressure = evt.get("pressure", 1.0) Input._update_touch(finger_id, action, x, y, pressure) # Primary finger emulates mouse for UI if action == 0 and primary_finger is None: primary_finger = finger_id Input._mouse_pos = (x, y) Input._mouse_delta = (0.0, 0.0) tree.ui_input(mouse_pos=(x, y), button=0, pressed=False) tree.ui_input(mouse_pos=(x, y), button=1, pressed=True) elif action == 1 and finger_id == primary_finger: primary_finger = None Input._mouse_pos = (x, y) tree.ui_input(mouse_pos=(x, y), button=0, pressed=False) tree.ui_input(mouse_pos=(x, y), button=1, pressed=False) elif action == 2 and finger_id == primary_finger: old = Input._mouse_pos Input._mouse_pos = (x, y) Input._mouse_delta = (x - old[0], y - old[1]) tree.ui_input(mouse_pos=(x, y), button=0, pressed=False) def setup(): nonlocal adapter, last_time import time as _time renderer = self._engine.renderer adapter = SceneAdapter(self._engine, renderer) tree._app = self # type: ignore[attr-defined] self._engine._scene_tree = tree # type: ignore[attr-defined] tree.set_root(root_node) last_time = _time.perf_counter() _frame_count = 0 def update(): nonlocal physics_accum, last_time, _frame_count # Capture the PREVIOUS frame (which _draw_frame() has now fully rendered). # Skip frame 0 — no previous frame exists yet. if _frame_count > 0 and server.has_clients(): try: pixels = self._engine.renderer.capture_frame() server.push_frame(pixels) except Exception: pass # Frame capture can fail during resize/shutdown _frame_count += 1 import time as _time now = _time.perf_counter() frame_dt = min(now - last_time, 0.1) last_time = now # Process input from browser clients _process_input() # Fixed-timestep physics physics_accum += frame_dt while physics_accum >= physics_dt: tree.physics_process(physics_dt) physics_accum -= physics_dt tree.process(frame_dt) Draw2D._reset() tree.draw(Draw2D) renderer = self._engine.renderer renderer.begin_frame() adapter.submit_scene(tree) Input._end_frame() def pre_render_fn(cmd): self._engine.renderer.pre_render(cmd) def render_fn(cmd, extent): self._engine.renderer.render(cmd) try: self._engine.run( callback=update, setup=setup, render=render_fn, pre_render=pre_render_fn, ) finally: server.stop() self._shutdown_audio(tree)