"""Graphics application layer — engine wrapper with node tree support."""
from __future__ import annotations
import logging
import time
from collections.abc import Callable
from typing import Any
from .engine import Engine
from .platform import AVAILABLE_BACKENDS
log = logging.getLogger(__name__)
__all__ = ["App", "AVAILABLE_BACKENDS"]
def _get_sdl3_type() -> type:
"""Return the Sdl3Backend class, or a dummy type if not importable."""
try:
from .platform._sdl3 import Sdl3Backend
return Sdl3Backend
except (ImportError, ModuleNotFoundError):
return type(None) # will never match isinstance()
[docs]
class App:
"""Graphics application wrapper. Supports both raw callbacks and node tree scenes.
Args:
title: Window title string.
width: Window width in pixels (must be >= 1).
height: Window height in pixels (must be >= 1).
backend: Windowing backend name. One of ``"glfw"`` (desktop),
``"sdl3"`` (touch/mobile), ``"qt"`` (embedding). ``None`` auto-detects.
physics_fps: Physics simulation rate.
visible: Open a visible window. Set ``False`` for headless tests.
vsync: Enable vertical sync at present time.
bg_colour: Background clear colour. ``"transparent"``, an RGBA tuple,
or ``None`` to use the theme background.
Usage with node tree (recommended)::
app = App(title="My Game", width=1280, height=720)
app.run(MyGameScene()) # Node subclass with ready()/process()
Usage with raw callbacks::
app = App(title="My Game")
app.run(update=my_update, render=my_render)
After ``run()`` has created the window, ``window_title``, ``window_size``,
and ``active_backend`` reflect the live state.
"""
def __init__(
self,
title: str = "SimVX",
width: int = 1280,
height: int = 720,
backend: str | None = None,
physics_fps: int = 60,
visible: bool = True,
vsync: bool = True,
bg_colour: tuple[float, float, float, float] | str | None = None,
**_kwargs,
):
if not isinstance(title, str):
raise TypeError(f"App title must be a string, got {type(title).__name__}")
if not isinstance(width, int) or not isinstance(height, int):
raise TypeError("App width and height must be integers")
if width < 1 or height < 1:
raise ValueError(f"App width and height must be >= 1, got {width}x{height}")
if backend is not None and backend not in AVAILABLE_BACKENDS:
raise ValueError(
f"Unknown backend {backend!r}. Allowed: {list(AVAILABLE_BACKENDS)}"
)
self.title = title
self.width = width
self.height = height
self._backend_name = backend
self._physics_fps = physics_fps
self._visible = visible
self._vsync = vsync
self._bg_colour = bg_colour
self._engine: Engine | None = None
[docs]
def toggle_fullscreen(self) -> None:
"""Toggle fullscreen mode."""
if self._engine:
self._engine.toggle_fullscreen()
@property
def is_fullscreen(self) -> bool:
return self._engine.is_fullscreen if self._engine else False
@property
def engine(self) -> Engine | None:
"""Access the graphics engine (available after run() starts)."""
return self._engine
@property
def active_backend(self) -> str | None:
"""Return the resolved backend name (e.g. ``"glfw"``), or ``None`` if
the engine has not yet initialised a window."""
if self._engine is None:
return None
return self._engine._resolved_backend_name
@property
def window_title(self) -> str:
"""Current window title. Writable once the window exists."""
if self._engine is not None:
return self._engine.title
return self.title
@window_title.setter
def window_title(self, value: str) -> None:
if not isinstance(value, str):
raise TypeError(f"window_title must be a string, got {type(value).__name__}")
self.title = value
if self._engine is not None:
self._engine.title = value
window = getattr(self._engine, "_window", None)
if window is not None and hasattr(window, "set_title"):
window.set_title(value)
@property
def window_size(self) -> tuple[int, int]:
"""Current window ``(width, height)``. Writable once the window exists."""
if self._engine is not None:
return (self._engine.width, self._engine.height)
return (self.width, self.height)
@window_size.setter
def window_size(self, value: tuple[int, int]) -> None:
if not isinstance(value, tuple | list) or len(value) != 2:
raise TypeError("window_size must be a (width, height) tuple")
w, h = int(value[0]), int(value[1])
if w < 1 or h < 1:
raise ValueError(f"window_size must be >= 1x1, got {w}x{h}")
self.width = w
self.height = h
if self._engine is not None:
self._engine.set_window_size(w, h)
[docs]
def run(
self,
root_or_update: Any = None,
*,
update: Callable[[], None] | None = None,
render: Callable[[object, tuple[int, int]], None] | None = None,
) -> None:
"""Run the graphics engine.
Args:
root_or_update: Either a Node (scene root) or a legacy update callback.
When a Node is passed, SceneTree and input are set up automatically.
update: Legacy per-frame callback (alternative to positional arg).
render: Custom render callback (cmd, extent). Only used in callback mode.
"""
from simvx.core import Node
if isinstance(root_or_update, Node):
self._run_with_tree(root_or_update)
else:
cb = root_or_update or update
self._run_with_callbacks(cb, render)
@staticmethod
def _init_audio_backend(tree) -> None:
"""Attach a MiniaudioBackend to the scene tree (best-effort)."""
try:
from simvx.core.audio_backend import MiniaudioBackend
tree._audio_backend = MiniaudioBackend() # type: ignore[attr-defined]
except Exception:
pass # No audio — miniaudio unavailable or no audio device
@staticmethod
def _shutdown_audio(tree) -> None:
"""Stop the audio backend so its playback thread doesn't block process exit."""
backend = getattr(tree, "_audio_backend", None)
if backend and hasattr(backend, "shutdown"):
backend.shutdown()
def _run_with_callbacks(
self,
update: Callable | None,
render: Callable | None,
) -> None:
"""Run with raw update/render callbacks (original behavior)."""
self._engine = Engine(
width=self.width,
height=self.height,
title=self.title,
backend=self._backend_name,
visible=self._visible,
vsync=self._vsync,
)
if self._bg_colour == "transparent":
self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
self._engine.clear_colour = list(self._bg_colour)
else:
from simvx.core.ui.theme import get_theme as _gt
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
self._engine.run(callback=update, render=render)
def _run_with_tree(self, root_node: Any) -> None:
"""Run with a node tree — full engine integration."""
from simvx.core import Input, SceneTree
from .draw2d import Draw2D
from .input_adapter import (
char_callback,
cursor_pos_callback,
key_callback_with_ui,
mouse_button_callback_with_ui,
poll_gamepads,
scroll_callback,
set_ui_callbacks,
)
from .scene_adapter import SceneAdapter
self._engine = Engine(
width=self.width,
height=self.height,
title=self.title,
backend=self._backend_name,
visible=self._visible,
vsync=self._vsync,
)
# Resolve bg_colour: "transparent" → RGBA zero, tuple → fixed, None → theme-driven
if self._bg_colour == "transparent":
fixed_bg: list[float] | None = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
fixed_bg = list(self._bg_colour)
else:
fixed_bg = None # syncs from theme each frame
if fixed_bg is not None:
self._engine.clear_colour = fixed_bg
else:
# Set initial clear colour from theme immediately
from simvx.core.ui.theme import get_theme as _init_gt
c = _init_gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
last_theme_gen = -1 # track theme changes for bg sync
tree = SceneTree(screen_size=(self.width, self.height))
self._init_audio_backend(tree)
adapter: SceneAdapter | None = None
physics_dt = 1.0 / self._physics_fps
physics_accum = 0.0
last_time = 0.0
last_mouse_pos = (0.0, 0.0)
# --- UI input routing callbacks ---
def _ui_char(ch: str):
tree.ui_input(char=ch)
def _ui_key(key_name: str, pressed: bool):
tree.ui_input(key=key_name, pressed=pressed)
def _ui_mouse(button: int, pressed: bool):
tree.ui_input(mouse_pos=Input._mouse_pos, button=button, pressed=pressed)
def _ui_motion(pos: tuple[float, float]):
nonlocal last_mouse_pos
if pos != last_mouse_pos:
last_mouse_pos = pos
tree.ui_input(mouse_pos=pos, button=0, pressed=False)
def _ui_touch(finger_id: int, action: int, x: float, y: float):
tree.touch_input(finger_id, action, x, y)
def setup():
nonlocal adapter, last_time
# Wire UI routing callbacks
set_ui_callbacks(
char=_ui_char,
key=_ui_key,
mouse=_ui_mouse,
motion=_ui_motion,
scroll=_ui_key, # scroll events use same key routing
touch=_ui_touch,
)
# Wire input (with UI routing)
self._engine.set_key_callback(key_callback_with_ui)
self._engine.set_mouse_button_callback(mouse_button_callback_with_ui)
self._engine.set_cursor_pos_callback(cursor_pos_callback)
self._engine.set_scroll_callback(scroll_callback)
self._engine.set_char_callback(char_callback)
# Wire touch callback if backend supports it (SDL3)
win = self._engine._window
if hasattr(win, "set_touch_callback"):
from .input_adapter import touch_callback
win.set_touch_callback(touch_callback)
# Wire clipboard for UI widgets (backend-specific)
try:
from simvx.core.ui.clipboard import set_backend as _set_clipboard
win = self._engine._window
if isinstance(win, _get_sdl3_type()):
# SDL3 clipboard — works without a window handle
import sdl3 as _sdl
def _sdl3_paste():
val = _sdl.SDL_GetClipboardText()
if val is None:
return ""
return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val)
_set_clipboard(lambda text: _sdl.SDL_SetClipboardText(text.encode()), _sdl3_paste)
else:
# GLFW clipboard
import glfw
_win = win._window
def _glfw_paste():
val = glfw.get_clipboard_string(_win)
if val is None:
return ""
return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val)
_set_clipboard(lambda text: glfw.set_clipboard_string(_win, text), _glfw_paste)
except Exception:
pass
# Store HiDPI content scale on theme (informational).
# Do NOT scale Draw2D._base_height — the 2x framebuffer
# provides sharper glyphs automatically. Scaling _base_height
# would double text size in logical coordinates while widget
# sizes stay at 1x, causing text to overflow containers.
sx, _sy = self._engine._content_scale
if sx > 1.01:
from simvx.core.ui.theme import get_theme
get_theme().ui_scale = sx
get_theme()._sync_dicts()
# Create renderer + scene adapter
renderer = self._engine.renderer
adapter = SceneAdapter(self._engine, renderer)
# Expose app and window handle on the tree for runtime access
tree._app = self # type: ignore[attr-defined]
tree._platform_window = getattr(self._engine._window, "_window", None)
self._engine._scene_tree = tree # type: ignore[attr-defined]
# Set scene root (triggers ready() on all nodes)
tree.set_root(root_node)
last_time = time.perf_counter()
def update():
"""Game logic — runs BEFORE any GPU command recording."""
nonlocal physics_accum, last_time, last_theme_gen
from simvx.core.debug import Debug
p = Debug.profiler
now = time.perf_counter()
frame_dt = min(now - last_time, 0.1) # Cap at 100ms
last_time = now
p.begin("total")
# Fixed-timestep physics
p.begin("physics")
physics_accum += frame_dt
while physics_accum >= physics_dt:
tree.physics_process(physics_dt)
physics_accum -= physics_dt
p.end("physics")
# Poll gamepads
poll_gamepads(self._engine._window)
# Sync screen size with window size (UI coordinates match mouse positions)
if self._engine._window:
ws = self._engine._window.get_window_size()
if tree.screen_size != (ws[0], ws[1]):
tree.screen_size = (ws[0], ws[1])
# Re-query cursor position after resize so Input._mouse_pos
# stays consistent with the new window dimensions.
if hasattr(self._engine._window, "get_cursor_pos"):
cx, cy = self._engine._window.get_cursor_pos()
Input._mouse_pos = (cx, cy)
# Sync background colour from theme (when not explicitly overridden)
if fixed_bg is None:
from simvx.core.ui.theme import get_theme as _gt, theme_generation as _tg
gen = _tg()
if gen != last_theme_gen:
last_theme_gen = gen
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
# Game logic
p.begin("process")
tree.process(frame_dt)
p.end("process")
# Mouse motion is now dispatched immediately from cursor_pos_callback
# via _ui.motion callback, so no per-frame dispatch needed here.
# 2D drawing pass — nodes submit Draw2D geometry
p.begin("draw")
Draw2D._reset()
tree.draw(Draw2D)
p.end("draw")
# Mouse picking on click
if Input._keys_just_pressed.get("mouse_1"):
tree.input_cast(Input._mouse_pos, button=1)
# Submit scene data for rendering (populates instances, materials, lights)
p.begin("submit")
renderer = self._engine.renderer
renderer.begin_frame()
adapter.submit_scene(tree)
p.end("submit")
p.end("total")
p.end_frame()
# Clear per-frame input state
Input._end_frame()
def pre_render_fn(cmd):
"""Offscreen passes (shadows) — after game logic, before main render pass."""
renderer = self._engine.renderer
renderer.pre_render(cmd)
def render_fn(cmd, extent):
"""Main render pass — inside vkCmdBeginRenderPass."""
renderer = self._engine.renderer
renderer.render(cmd)
self._engine.run(
callback=update,
setup=setup,
render=render_fn,
pre_render=pre_render_fn,
)
self._shutdown_audio(tree)
# ------------------------------------------------------------------
# Headless rendering (for tests and CI)
# ------------------------------------------------------------------
[docs]
def run_headless(
self,
root_node: Any,
*,
frames: int = 1,
on_frame: Callable[[int, float], bool | None] | None = None,
capture_frames: list[int] | None = None,
capture_fn: Callable[[int], bool] | None = None,
) -> list:
"""Run the engine headlessly for *frames* frames and return captured pixels.
Args:
root_node: Scene root node.
frames: Total number of frames to simulate.
on_frame: Optional callback invoked with (frame_index, time) before each frame.
Return False to stop early.
capture_frames: Which frame indices to capture (None = capture all).
capture_fn: Dynamic capture predicate — called with frame index, captures if True.
Takes precedence over *capture_frames* when provided.
Returns:
List of (H, W, 4) uint8 RGBA numpy arrays.
"""
import numpy as np
from simvx.core import Input, SceneTree
from .draw2d import Draw2D
from .scene_adapter import SceneAdapter
self._engine = Engine(
width=self.width,
height=self.height,
title=self.title,
backend=self._backend_name,
visible=False,
vsync=False,
)
# Sync clear colour from theme (same as _run_with_tree)
if self._bg_colour == "transparent":
self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
self._engine.clear_colour = list(self._bg_colour)
else:
from simvx.core.ui.theme import get_theme as _gt
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
tree = SceneTree(screen_size=(self.width, self.height))
self._init_audio_backend(tree)
adapter: SceneAdapter | None = None
captured: list[np.ndarray] = []
frame_idx = 0
physics_dt = 1.0 / self._physics_fps
def setup():
nonlocal adapter
renderer = self._engine.renderer
adapter = SceneAdapter(self._engine, renderer)
tree._app = self # type: ignore[attr-defined]
self._engine._scene_tree = tree # type: ignore[attr-defined]
tree.set_root(root_node)
# The engine loop is: update() → _draw_frame(). Frame N is drawn after
# update N returns. capture_frame() reads the *last drawn* image, so we
# capture frame N-1 at the start of update N. The final frame is captured
# in the extra update call where frame_idx == frames.
def update():
nonlocal frame_idx
# Capture the frame drawn by the previous _draw_frame call
if frame_idx > 0:
prev = frame_idx - 1
should_capture = (
(capture_fn is not None and capture_fn(prev))
or (capture_fn is None and (capture_frames is None or prev in capture_frames))
)
if should_capture:
captured.append(self._engine.renderer.capture_frame())
if frame_idx >= frames:
self._engine._running = False
return
if on_frame:
if on_frame(frame_idx, frame_idx * physics_dt) is False:
self._engine._running = False
return
tree.physics_process(physics_dt)
tree.process(physics_dt)
Draw2D._reset()
tree.draw(Draw2D)
renderer = self._engine.renderer
renderer.begin_frame()
adapter.submit_scene(tree)
Input._end_frame()
frame_idx += 1
def pre_render_fn(cmd):
self._engine.renderer.pre_render(cmd)
def render_fn(cmd, extent):
self._engine.renderer.render(cmd)
self._engine.run(
callback=update,
setup=setup,
render=render_fn,
pre_render=pre_render_fn,
)
self._shutdown_audio(tree)
return captured
# ------------------------------------------------------------------
# Streaming (WebRTC to browser)
# ------------------------------------------------------------------
[docs]
def run_streaming(
self,
root_node: Any,
server: Any,
) -> None:
"""Run the engine headlessly and stream frames to browsers via WebRTC.
Args:
root_node: Scene root node.
server: A ``StreamingServer`` instance (from ``simvx.graphics.streaming``).
"""
from simvx.core import Input, SceneTree
from .draw2d import Draw2D
from .scene_adapter import SceneAdapter
self._engine = Engine(
width=self.width, height=self.height, title=self.title,
backend=self._backend_name, visible=False, vsync=False,
)
if self._bg_colour == "transparent":
self._engine.clear_colour = [0.0, 0.0, 0.0, 0.0]
elif isinstance(self._bg_colour, tuple | list):
self._engine.clear_colour = list(self._bg_colour)
else:
from simvx.core.ui.theme import get_theme as _gt
c = _gt().bg_black
self._engine.clear_colour = [c[0], c[1], c[2], c[3]]
tree = SceneTree(screen_size=(self.width, self.height))
self._init_audio_backend(tree)
adapter: SceneAdapter | None = None
physics_dt = 1.0 / self._physics_fps
physics_accum = 0.0
last_time = 0.0
last_mouse_pos = (0.0, 0.0)
primary_finger: int | None = None
# Start the streaming server
server.start(self.width, self.height)
def _process_input() -> None:
"""Drain queued input events from server and route to Input + UI."""
nonlocal last_mouse_pos, primary_finger
from .input_adapter import _KEY_MAP
for evt in server.drain_input():
etype = evt.get("type")
if etype == "key":
code = evt.get("code", 0)
pressed = evt.get("pressed", False)
Input._on_key(code, pressed)
key_name = _KEY_MAP.get(code)
if key_name:
if pressed:
if not Input._keys.get(key_name):
Input._keys_just_pressed[key_name] = True
Input._keys[key_name] = True
else:
Input._keys[key_name] = False
Input._keys_just_released[key_name] = True
tree.ui_input(key=key_name, pressed=pressed)
elif etype == "char":
codepoint = evt.get("codepoint", 0)
tree.ui_input(char=chr(codepoint))
elif etype == "mouse":
button = evt.get("button", 0)
pressed = evt.get("pressed", False)
Input._on_mouse_button(button, pressed)
btn = f"mouse_{button + 1}"
if pressed:
if not Input._keys.get(btn):
Input._keys_just_pressed[btn] = True
Input._keys[btn] = True
else:
Input._keys[btn] = False
Input._keys_just_released[btn] = True
tree.ui_input(mouse_pos=Input._mouse_pos, button=button + 1, pressed=pressed)
elif etype == "mousemove":
x, y = evt.get("x", 0.0), evt.get("y", 0.0)
old = Input._mouse_pos
Input._mouse_pos = (x, y)
Input._mouse_delta = (x - old[0], y - old[1])
pos = (x, y)
if pos != last_mouse_pos:
last_mouse_pos = pos
tree.ui_input(mouse_pos=pos, button=0, pressed=False)
elif etype == "scroll":
dx, dy = evt.get("dx", 0.0), evt.get("dy", 0.0)
Input._scroll_delta = (
Input._scroll_delta[0] + dx,
Input._scroll_delta[1] + dy,
)
if dy > 0:
tree.ui_input(key="scroll_up", pressed=True)
elif dy < 0:
tree.ui_input(key="scroll_down", pressed=True)
elif etype == "touch":
finger_id = evt.get("id", 0)
action = evt.get("action", 0)
x, y = evt.get("x", 0.0), evt.get("y", 0.0)
pressure = evt.get("pressure", 1.0)
Input._update_touch(finger_id, action, x, y, pressure)
# Primary finger emulates mouse for UI
if action == 0 and primary_finger is None:
primary_finger = finger_id
Input._mouse_pos = (x, y)
Input._mouse_delta = (0.0, 0.0)
tree.ui_input(mouse_pos=(x, y), button=0, pressed=False)
tree.ui_input(mouse_pos=(x, y), button=1, pressed=True)
elif action == 1 and finger_id == primary_finger:
primary_finger = None
Input._mouse_pos = (x, y)
tree.ui_input(mouse_pos=(x, y), button=0, pressed=False)
tree.ui_input(mouse_pos=(x, y), button=1, pressed=False)
elif action == 2 and finger_id == primary_finger:
old = Input._mouse_pos
Input._mouse_pos = (x, y)
Input._mouse_delta = (x - old[0], y - old[1])
tree.ui_input(mouse_pos=(x, y), button=0, pressed=False)
def setup():
nonlocal adapter, last_time
import time as _time
renderer = self._engine.renderer
adapter = SceneAdapter(self._engine, renderer)
tree._app = self # type: ignore[attr-defined]
self._engine._scene_tree = tree # type: ignore[attr-defined]
tree.set_root(root_node)
last_time = _time.perf_counter()
_frame_count = 0
def update():
nonlocal physics_accum, last_time, _frame_count
# Capture the PREVIOUS frame (which _draw_frame() has now fully rendered).
# Skip frame 0 — no previous frame exists yet.
if _frame_count > 0 and server.has_clients():
try:
pixels = self._engine.renderer.capture_frame()
server.push_frame(pixels)
except Exception:
pass # Frame capture can fail during resize/shutdown
_frame_count += 1
import time as _time
now = _time.perf_counter()
frame_dt = min(now - last_time, 0.1)
last_time = now
# Process input from browser clients
_process_input()
# Fixed-timestep physics
physics_accum += frame_dt
while physics_accum >= physics_dt:
tree.physics_process(physics_dt)
physics_accum -= physics_dt
tree.process(frame_dt)
Draw2D._reset()
tree.draw(Draw2D)
renderer = self._engine.renderer
renderer.begin_frame()
adapter.submit_scene(tree)
Input._end_frame()
def pre_render_fn(cmd):
self._engine.renderer.pre_render(cmd)
def render_fn(cmd, extent):
self._engine.renderer.render(cmd)
try:
self._engine.run(
callback=update, setup=setup,
render=render_fn, pre_render=pre_render_fn,
)
finally:
server.stop()
self._shutdown_audio(tree)