Source code for simvx.graphics.frame_loop

"""FrameLoop: the single per-frame engine drive seam (clock + sink + step).

Historically ``App._run_with_tree`` (visible), ``App.run_headless``,
``App.run_streaming``, and the editor's PlayMode each hand-rolled the same
``physics -> interpolate -> tick -> Draw2D -> submit_scene -> present/capture``
core plus an identical pipelined ``frame_driver``. FrameLoop owns that core once;
the per-loop differences are expressed as three pluggable pieces:

- **Clock**: how wall time advances and the physics-stepping strategy
  (``RealTimeClock`` accumulator vs ``FixedStepClock`` single-step).
- **Sink**: where the rendered frame goes and how capture is wired
  (``SwapchainSink`` present, ``OffscreenSink`` readback, ``StreamSink`` push).
- **Integration**: input source + windowed extras (callbacks, gamepad, resize,
  picking) -- supplied per loop.

The LLM agent live-session and the editor viewport are just ``FixedStepClock``
(externally driven) + an offscreen or swapchain sink. This module is migrated
one consumer at a time; ``run_headless`` is the first to land on it.
"""

from __future__ import annotations

import logging
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from .app import App
    from .engine import Engine

log = logging.getLogger(__name__)


class _RenderThreadController:
    """Lets ``Engine._rebuild_device`` quiesce + respawn FrameLoop's render thread.

    Installed on the engine only when running pipelined. ``stop`` closes the packet
    ring and joins the thread (so no stale packet referencing dead handles is
    recorded against the device being destroyed); ``start`` spins up a fresh ring +
    thread on the rebuilt device. Keeping this on the loop side means the engine
    never imports the render-thread machinery for a synchronous run.
    """

    def __init__(self, loop: FrameLoop) -> None:
        self._loop = loop

    def stop(self) -> None:
        self._loop._stop_render_thread()

    def start(self) -> None:
        self._loop._start_render_thread()


#: Loops that have built their engine + tree and not yet torn down. Lets an external
#: introspector (e.g. a PEP 768 injected bootstrap) discover the running loop in-process
#: without the game cooperating. Pure stdlib; insertion-ordered (most recent last).
_active_loops: list[FrameLoop] = []


[docs] def active_loops() -> list[FrameLoop]: """Snapshot of currently-running :class:`FrameLoop` instances (most recent last).""" return list(_active_loops)
# ============================================================================ # Clock -- time advance + physics stepping strategy # ============================================================================
[docs] class FrameClock: """Decides per-frame dt and how physics/logic advance."""
[docs] def on_begin(self, app: App) -> None: # noqa: D401 - hook """Called once after setup, before the first frame."""
[docs] def should_stop(self, frame_idx: int) -> bool: return False
[docs] def advance(self, app: App) -> float: raise NotImplementedError
[docs] def step_physics(self, app: App, tree: Any, frame_dt: float) -> None: raise NotImplementedError
[docs] def tick_logic(self, app: App, tree: Any, frame_dt: float) -> None: tree.tick(frame_dt * app._time_scale)
[docs] class FixedStepClock(FrameClock): """Fixed timestep: one physics step per frame, interpolate at the step boundary. With ``frames`` set this is the headless batch clock; with ``frames=None`` and an explicit ``dt`` it is the externally-driven clock the agent / editor use (the caller advances frames, no internal cap). """ def __init__(self, *, frames: int | None = None, dt: float | None = None) -> None: self.frames = frames self._dt = dt
[docs] def should_stop(self, frame_idx: int) -> bool: return self.frames is not None and frame_idx >= self.frames
[docs] def advance(self, app: App) -> float: return self._dt if self._dt is not None else 1.0 / app._physics_fps
[docs] def step_physics(self, app: App, tree: Any, frame_dt: float) -> None: scaled = frame_dt * app._time_scale tree.physics_tick(scaled) # alpha=1 -> current pose; headless captures at the step boundary so no # real interpolation is needed (byte-identical to the immediate write). tree.interpolate_physics(1.0)
[docs] def tick_logic(self, app: App, tree: Any, frame_dt: float) -> None: tree.tick(frame_dt * app._time_scale)
[docs] class RealTimeClock(FrameClock): """Wall-clock dt with a fixed-timestep physics accumulator (windowed / streaming).""" def __init__(self) -> None: self._last = 0.0 self._accum = 0.0
[docs] def on_begin(self, app: App) -> None: import time self._last = time.perf_counter()
[docs] def advance(self, app: App) -> float: import time now = time.perf_counter() frame_dt = min(now - self._last, 0.1) # cap at 100ms self._last = now return frame_dt
[docs] def step_physics(self, app: App, tree: Any, frame_dt: float) -> None: physics_dt = 1.0 / app._physics_fps self._accum += frame_dt scaled_physics_dt = physics_dt * app._time_scale while self._accum >= physics_dt: tree.physics_tick(scaled_physics_dt) self._accum -= physics_dt tree.interpolate_physics(self._accum / physics_dt)
# ============================================================================ # Sink -- where the rendered frame goes + capture wiring # ============================================================================
[docs] class FrameSink: """Where the frame goes; supplies the render-thread capture + sync readback.""" #: RenderPacketRing capacity: 2 overlaps frames (windowed/stream), 1 lock-steps #: the producer with the render thread so per-frame capture is exact (headless). ring_capacity: int = 2
[docs] def render_thread_capture(self) -> Callable[[int, Any], None] | None: """Callback the pipelined RenderThread runs after present (or None).""" return None
[docs] def capture_synchronous(self, loop: FrameLoop, frame_idx: int) -> None: """Synchronous-mode readback of the just-drawn frame (no-op by default)."""
[docs] class SwapchainSink(FrameSink): """Present to the window; no readback. Windowed games.""" ring_capacity = 2
[docs] class OffscreenSink(FrameSink): """Read frames back into ``captured`` (headless tests, agent capture).""" ring_capacity = 1 def __init__(self, *, capture_frames: Sequence[int] | None = (), capture_fn: Callable[[int], bool] | None = None): self.capture_frames = capture_frames self.capture_fn = capture_fn self.captured: list[Any] = [] def _should_capture(self, idx: int) -> bool: if self.capture_fn is not None: return bool(self.capture_fn(idx)) return self.capture_frames is None or idx in self.capture_frames
[docs] def render_thread_capture(self) -> Callable[[int, Any], None]: def _capture(idx: int, rgba: Any) -> None: # Runs on the render thread after present; ordered by frame index. if self._should_capture(idx): self.captured.append(rgba) return _capture
[docs] def capture_synchronous(self, loop: FrameLoop, frame_idx: int) -> None: if self._should_capture(frame_idx): self.captured.append(loop.engine.renderer.capture_frame())
[docs] class StreamSink(FrameSink): """Push each drawn frame to connected browser clients over WebSocket.""" ring_capacity = 2 def __init__(self, server: Any) -> None: self.server = server
[docs] def render_thread_capture(self) -> Callable[[int, Any], None]: server = self.server def _capture(_idx: int, rgba: Any) -> None: # Runs on the render thread after present. if server.has_clients(): try: server.push_frame(rgba) except (RuntimeError, AttributeError) as exc: log.debug("Streaming frame push skipped: %s", exc, exc_info=True) return _capture
[docs] def capture_synchronous(self, loop: FrameLoop, frame_idx: int) -> None: if not self.server.has_clients(): return import vulkan as _vk # type: ignore[import-untyped] try: pixels = loop.engine.renderer.capture_frame() self.server.push_frame(pixels) except (_vk.VkErrorOutOfDateKhr, _vk.VkErrorSurfaceLostKhr, RuntimeError, AttributeError) as exc: # A swapchain mid-resize/teardown or a renderer being torn down makes a # single capture transiently fail: streaming skips this frame. log.debug("Frame capture for streaming skipped: %s", exc, exc_info=True)
# ============================================================================ # Integration -- input source + windowed extras # ============================================================================
[docs] class FrameIntegration: """Per-loop input source and windowed extras.""" #: Reset the global Input/Draw2D state at begin (re-runnable headless paths). reset_globals: bool = False
[docs] def on_run_begin(self, loop: FrameLoop) -> None: """After the engine + tree exist, before the loop (e.g. start a stream server)."""
[docs] def on_run_end(self, loop: FrameLoop) -> None: """In the run() finally, before audio shutdown (e.g. stop a stream server)."""
[docs] def on_setup(self, loop: FrameLoop) -> None: """Extra setup after the adapter exists (input callbacks, clipboard)."""
[docs] def pre_tick(self, loop: FrameLoop) -> bool: """Input drain / window sync before the tick. Return False to stop the loop.""" return True
[docs] def mid_tick(self, loop: FrameLoop) -> None: """Between physics and logic tick (theme sync on the windowed path)."""
[docs] def post_tick(self, loop: FrameLoop) -> None: """After Draw2D submit (mouse picking on the windowed path)."""
[docs] def per_frame_telemetry(self, loop: FrameLoop) -> None: """Live per-frame telemetry (windowed path)."""
[docs] def on_loop_end(self, loop: FrameLoop) -> None: """At the stop boundary (headless captures final telemetry here)."""
[docs] def pre_render(self, loop: FrameLoop, cmd: Any) -> None: """Offscreen passes before the main render pass (SubViewports, probes).""" loop._default_pre_render(cmd)
[docs] class HeadlessIntegration(FrameIntegration): """Batch headless: InputSimulator drives Input; capture + telemetry at the end.""" reset_globals = True def __init__(self, on_frame: Callable[[int, float], bool | None] | None = None) -> None: self.on_frame = on_frame
[docs] def pre_tick(self, loop: FrameLoop) -> bool: if self.on_frame is None: return True physics_dt = 1.0 / loop.app._physics_fps return self.on_frame(loop.frame_idx, loop.frame_idx * physics_dt) is not False
[docs] def on_loop_end(self, loop: FrameLoop) -> None: loop._capture_telemetry()
[docs] class StreamingIntegration(FrameIntegration): """Browser streaming: drain queued client input, push readback to clients.""" def __init__(self, server: Any) -> None: self.server = server self._last_mouse_pos = (0.0, 0.0) self._primary_finger: int | None = None
[docs] def on_run_begin(self, loop: FrameLoop) -> None: self.server.start(loop.app.width, loop.app.height)
[docs] def on_run_end(self, loop: FrameLoop) -> None: self.server.stop()
[docs] def pre_tick(self, loop: FrameLoop) -> bool: self._process_input(loop.tree) return True
[docs] def pre_render(self, loop: FrameLoop, cmd: Any) -> None: renderer = loop.engine.renderer if not loop.pipelined: # SYNCHRONOUS path: byte-identical to prior streaming behaviour (main scene # only; _upload_transforms self-reserves its base-0 slice). SubViewport / # probe handling was never wired into synchronous streaming and stays out. renderer.pre_render(cmd) return # PIPELINED path (RENDER THREAD): reserve main slice, replay SubViewport SRUs. renderer.reserve_main_slice() if loop.adapter is not None: for sru in renderer._packet_subviewport_srus or []: loop.adapter.render_sru_from_plan(cmd, sru) renderer.pre_render(cmd)
def _process_input(self, tree: Any) -> None: """Drain queued input events from the server and route to Input + UI.""" from simvx.core import MouseButton from simvx.core.input import Input from .input_adapter import _KEY_MAP for evt in self.server.drain_input(): etype = evt.get("type") if etype == "key": code = evt.get("code", 0) pressed = evt.get("pressed", False) Input._on_key(code, pressed) key_name = _KEY_MAP.get(code) if key_name: if pressed: if not Input._keys.get(key_name): Input._keys_just_pressed[key_name] = True Input._keys[key_name] = True else: Input._keys[key_name] = False Input._keys_just_released[key_name] = True tree.ui_input(key=key_name, pressed=pressed) elif etype == "char": codepoint = evt.get("codepoint", 0) tree.ui_input(char=chr(codepoint)) elif etype == "mouse": button = evt.get("button", 0) pressed = evt.get("pressed", False) Input._on_mouse_button(button, pressed) btn = f"mouse_{button + 1}" if pressed: if not Input._keys.get(btn): Input._keys_just_pressed[btn] = True Input._keys[btn] = True else: Input._keys[btn] = False Input._keys_just_released[btn] = True tree.ui_input(mouse_pos=Input._mouse_pos, button=MouseButton(button), pressed=pressed) elif etype == "mousemove": x, y = evt.get("x", 0.0), evt.get("y", 0.0) old = Input._mouse_pos Input._mouse_pos = (x, y) Input._mouse_delta = (x - old[0], y - old[1]) pos = (x, y) if pos != self._last_mouse_pos: self._last_mouse_pos = pos tree.ui_input(mouse_pos=pos, button=None, pressed=False) elif etype == "scroll": dx, dy = evt.get("dx", 0.0), evt.get("dy", 0.0) Input._scroll_delta = (Input._scroll_delta[0] + dx, Input._scroll_delta[1] + dy) if dy > 0: tree.ui_input(key="scroll_up", pressed=True) elif dy < 0: tree.ui_input(key="scroll_down", pressed=True) elif etype == "touch": finger_id = evt.get("id", 0) action = evt.get("action", 0) x, y = evt.get("x", 0.0), evt.get("y", 0.0) pressure = evt.get("pressure", 1.0) Input._update_touch(finger_id, action, x, y, pressure) # Primary finger emulates mouse for UI. if action == 0 and self._primary_finger is None: self._primary_finger = finger_id Input._mouse_pos = (x, y) Input._mouse_delta = (0.0, 0.0) tree.ui_input(mouse_pos=(x, y), button=None, pressed=False) tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=True) elif action == 1 and finger_id == self._primary_finger: self._primary_finger = None Input._mouse_pos = (x, y) tree.ui_input(mouse_pos=(x, y), button=None, pressed=False) tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=False) elif action == 2 and finger_id == self._primary_finger: old = Input._mouse_pos Input._mouse_pos = (x, y) Input._mouse_delta = (x - old[0], y - old[1]) tree.ui_input(mouse_pos=(x, y), button=None, pressed=False)
[docs] class WindowedIntegration(FrameIntegration): """Visible game window: platform input callbacks, gamepad, resize, picking. Gamepad poll / window-resize / theme sync run in ``mid_tick`` (after physics, before the logic tick) to match the original ``_run_with_tree`` ordering. """ def __init__(self) -> None: self._last_mouse_pos = (0.0, 0.0) self._last_theme_gen = -1 self._theme_driven_bg = False
[docs] def on_run_end(self, loop: FrameLoop) -> None: # Drop the capture callback so a second in-process app run can't invoke a destroyed window. from simvx.core.input import Input Input._capture_mode_callback = None
[docs] def on_setup(self, loop: FrameLoop) -> None: from .input_adapter import ( char_callback, cursor_pos_callback, key_callback_with_ui, mouse_button_callback_with_ui, scroll_callback, set_ui_callbacks, ) app = loop.app engine = loop.engine tree = loop.tree bg = app._bg_colour self._theme_driven_bg = not (bg == "transparent" or isinstance(bg, tuple | list)) def _ui_char(ch: str) -> None: tree.ui_input(char=ch) def _ui_key(key_name: str, pressed: bool) -> None: tree.ui_input(key=key_name, pressed=pressed) def _ui_mouse(button: Any, pressed: bool) -> None: from simvx.core.input import Input tree.ui_input(mouse_pos=Input._mouse_pos, button=button, pressed=pressed) def _ui_motion(pos: tuple[float, float]) -> None: if pos != self._last_mouse_pos: self._last_mouse_pos = pos tree.ui_input(mouse_pos=pos, button=None, pressed=False) def _ui_touch(finger_id: int, action: int, x: float, y: float) -> None: tree.touch_input(finger_id, action, x, y) set_ui_callbacks( char=_ui_char, key=_ui_key, mouse=_ui_mouse, motion=_ui_motion, scroll=_ui_key, # scroll events use same key routing touch=_ui_touch, tree_propagate=tree.propagate_input, ) engine.set_key_callback(key_callback_with_ui) engine.set_mouse_button_callback(mouse_button_callback_with_ui) engine.set_cursor_pos_callback(cursor_pos_callback) engine.set_scroll_callback(scroll_callback) engine.set_char_callback(char_callback) # Wire mouse capture to the OS cursor: apply any mode set before the window existed, # then route future Input.set_mouse_capture_mode() calls straight to the backend. from simvx.core.input import Input engine.set_mouse_capture(int(Input.get_mouse_capture_mode())) Input._capture_mode_callback = lambda mode: engine.set_mouse_capture(int(mode)) win = engine._window if hasattr(win, "set_touch_callback"): from .input_adapter import touch_callback win.set_touch_callback(touch_callback) self._wire_clipboard(engine) # Store HiDPI content scale on theme (do NOT scale Draw2D base height). sx, _sy = engine.content_scale if sx > 1.01: from simvx.core.ui.theme import get_theme get_theme().ui_scale = sx get_theme()._sync_dicts() tree._platform_window = getattr(engine._window, "_window", None)
def _wire_clipboard(self, engine: Any) -> None: try: from simvx.core.ui.clipboard import set_backend as _set_clipboard from .app import _get_sdl3_type win = engine._window if isinstance(win, _get_sdl3_type()): import sdl3 as _sdl # type: ignore[import-not-found] def _sdl3_paste() -> str: val = _sdl.SDL_GetClipboardText() if val is None: return "" return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val) _set_clipboard(lambda text: _sdl.SDL_SetClipboardText(text.encode()), _sdl3_paste) else: import glfw # type: ignore[import-untyped] _win = win._window def _glfw_paste() -> str: val = glfw.get_clipboard_string(_win) if val is None: return "" return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val) _set_clipboard(lambda text: glfw.set_clipboard_string(_win, text), _glfw_paste) except (ImportError, AttributeError) as exc: log.debug("Clipboard backend unavailable; using no-op clipboard: %s", exc, exc_info=True)
[docs] def mid_tick(self, loop: FrameLoop) -> None: from simvx.core.input import Input from .input_adapter import poll_gamepads engine = loop.engine tree = loop.tree poll_gamepads(engine._window) if engine._window: ws = engine._window.get_window_size() if tree.screen_size != (ws[0], ws[1]): tree.screen_size = (ws[0], ws[1]) if hasattr(engine._window, "get_cursor_pos"): cx, cy = engine._window.get_cursor_pos() Input._mouse_pos = (cx, cy) if self._theme_driven_bg: from simvx.core.ui.theme import get_theme, theme_generation gen = theme_generation() if gen != self._last_theme_gen: self._last_theme_gen = gen c = get_theme().bg_black engine.clear_colour = [c[0], c[1], c[2], c[3]] # type: ignore[attr-defined]
[docs] def post_tick(self, loop: FrameLoop) -> None: from simvx.core import MouseButton from simvx.core.input import Input if Input._keys_just_pressed.get("mouse_1"): loop.tree.input_cast(Input._mouse_pos, button=MouseButton.LEFT)
[docs] def per_frame_telemetry(self, loop: FrameLoop) -> None: if not loop.pipelined: loop.app._populate_telemetry(frames_rendered=loop.frame_idx)
# ============================================================================ # FrameServices -- per-loop external request drain at the frame barrier # ============================================================================
[docs] class FrameService: """A unit of work drained once per frame on the loop (main) thread. The contract is the marshalling pattern external drivers (PEP 768 socket bridge, editor tooling) need: a producer thread enqueues work, and this runs on the one thread that owns the SceneTree, AT the post-tick barrier (tree fully ticked + Draw2D built, GPU handoff not yet started). Implement :meth:`on_frame_service`; keep it bounded and non-blocking. """
[docs] def on_frame_service(self, loop: FrameLoop) -> None: # noqa: D401 - hook """Drain queued work on the loop thread. Must not block the frame."""
[docs] def on_loop_end(self, loop: FrameLoop) -> None: # noqa: D401 - hook """Release resources when the owning loop tears down (close sockets, join threads)."""
[docs] class FrameServices: """Ordered registry of :class:`FrameService` objects drained each frame. Loop-level (not integration-level) so every clock/sink/integration combo -- windowed, headless, streaming, and the externally-driven agent/editor path -- reaches the same barrier through ``FrameLoop._update`` with zero per-consumer wiring. Registration is the canonical way to attach a per-frame drain to a *running* loop; it is dependency-free (pure stdlib list). """ def __init__(self) -> None: self._services: list[FrameService] = []
[docs] def add(self, service: FrameService) -> FrameService: """Register a service. Returns it for ``svc = loop.services.add(MyService())``.""" self._services.append(service) return service
[docs] def remove(self, service: FrameService) -> None: """Deregister a service (idempotent).""" if service in self._services: self._services.remove(service)
[docs] def __bool__(self) -> bool: return bool(self._services)
[docs] def run(self, loop: FrameLoop) -> None: """Drain every registered service on the loop thread, in registration order. A failing service is fully logged and the remaining services still run; the loop must never be left half-drained or silently killed by one bad service. """ for service in self._services: try: service.on_frame_service(loop) except Exception: log.exception("FrameService %r raised during frame drain", service)
[docs] def close(self, loop: FrameLoop) -> None: """Tell every service the loop is ending, then clear the registry. Called from FrameLoop._teardown so a registered bridge (its accept thread + bound socket) is shut down with the loop instead of leaking until process exit. """ for service in self._services: try: service.on_loop_end(loop) except Exception: log.exception("FrameService %r raised during loop-end close", service) self._services.clear()
# ============================================================================ # FrameLoop -- owns the engine lifecycle + the one per-frame core # ============================================================================
[docs] class FrameLoop: """One per-frame engine drive seam, parameterised by clock + sink + integration.""" #: Set by run() before any frame method executes. engine: Engine def __init__( self, app: App, root_node: Any, *, clock: FrameClock, sink: FrameSink, integration: FrameIntegration, visible: bool, vsync: bool, ) -> None: self.app = app self.root_node = root_node self.clock = clock self.sink = sink self.integration = integration self._visible = visible self._vsync = vsync #: Per-frame external request drain, serviced at the post-tick barrier. Empty #: (zero cost) unless a consumer registers a FrameService onto the running loop. self.services = FrameServices() self.tree: Any = None self.adapter: Any = None self.pipelined = False self.frame_idx = 0 self._render_ring: Any = None self._render_thread: Any = None self._packet_idx = 0 # 2D item-pipeline (design §2.6, P1.6): the only 2D renderer. Lazily # created the first frame: one RenderItemCache + ItemPublisher for the # main scene. self._item_cache: Any = None self._item_publisher: Any = None self._item_structure_version = -1 # ---------------------------------------------------------------- run def _build(self) -> None: """Construct the engine + tree (shared by run() and begin()).""" from simvx.core import Input, SceneTree from .draw2d import Draw2D from .engine import Engine app = self.app if self.integration.reset_globals: app._engine = None app._scene_adapter = None # type: ignore[attr-defined] Input._reset() Draw2D._reset() engine = Engine( width=app.width, height=app.height, title=app.title, backend=app._backend_name, visible=self._visible, vsync=self._vsync, target_fps=app._target_fps if self._visible else None, ) self.engine = engine app._engine = engine engine._multi_gpu_requested = app._multi_gpu self._apply_clear_colour() self.tree = SceneTree(screen_size=(app.width, app.height)) app._init_audio_backend(self.tree) app.last_telemetry = {} self.integration.on_run_begin(self) if self not in _active_loops: _active_loops.append(self) def _teardown(self) -> None: """Mirror of _build (shared by run() finally and end()).""" if self in _active_loops: _active_loops.remove(self) # Shut down any registered services (e.g. the agent socket bridge) so their # threads/sockets do not outlive the loop. self.services.close(self) self.integration.on_run_end(self) self.app._sub_viewports = None self.app._shutdown_audio(self.tree)
[docs] def run(self) -> None: """Build the engine and drive frames (blocking) until the loop stops.""" self._build() try: self.engine.run( callback=self._update, setup=self._setup, render=self._render_fn, pre_render=self._pre_render_fn, cleanup=self._cleanup, pre_shutdown=self._stop_render_thread, frame_driver=self._frame_driver, ) finally: self._teardown()
# ----------------------------------------------------- external driving
[docs] def begin(self) -> None: """Externally-driven setup: build the engine, create the window + device, run setup. Pair with :meth:`step_frame` / :meth:`capture` / :meth:`end`. For an External (FixedStepClock) clock -- the agent live-session and editor viewport.""" self._build() self.engine.begin(setup=self._setup, render=self._render_fn, pre_render=self._pre_render_fn)
[docs] def step_frame(self, dt: float | None = None) -> bool: """Advance exactly one frame; returns False when the loop should stop.""" if dt is not None and isinstance(self.clock, FixedStepClock): self.clock._dt = dt return self.engine.step(callback=self._update, frame_driver=self._frame_driver)
[docs] def capture(self, *, scale: float = 1.0, region: tuple[int, int, int, int] | None = None) -> Any: """Read back the last drawn frame as an (H, W, 4) uint8 RGBA array. Optional ``region=[x1,y1,x2,y2]`` crops and ``scale`` (<1.0) nearest-downsamples at the source -- the cheap detail/cost levers from the design. """ import numpy as np # When pipelined, the render thread shares the graphics queue + command pool and # guards them with renderer._frame_state_lock (see render_thread). A capture from # the loop thread (e.g. the agent bridge draining at the post-tick barrier) must # take that same lock or it races the in-flight present. Synchronous mode has no # render thread, so no lock is needed. if self.pipelined: with self.engine.renderer._frame_state_lock: img = self.engine.renderer.capture_frame() else: img = self.engine.renderer.capture_frame() if region is not None: x1, y1, x2, y2 = region img = img[y1:y2, x1:x2] if scale != 1.0 and img.size: h, w = img.shape[:2] nh, nw = max(1, int(h * scale)), max(1, int(w * scale)) ys = np.minimum((np.arange(nh) / scale).astype(int), h - 1) xs = np.minimum((np.arange(nw) / scale).astype(int), w - 1) img = img[ys][:, xs] return img
[docs] def end(self) -> None: """Externally-driven teardown (counterpart to :meth:`begin`).""" try: self.engine.end(pre_shutdown=self._stop_render_thread, cleanup=self._cleanup) finally: self._teardown()
def _apply_clear_colour(self) -> None: app = self.app if app._bg_colour == "transparent": self.engine.clear_colour = [0.0, 0.0, 0.0, 0.0] # type: ignore[attr-defined] elif isinstance(app._bg_colour, tuple | list): self.engine.clear_colour = list(app._bg_colour) # type: ignore[attr-defined] else: from simvx.core.ui.theme import get_theme c = get_theme().bg_black self.engine.clear_colour = [c[0], c[1], c[2], c[3]] # type: ignore[attr-defined] # ---------------------------------------------------------------- setup def _setup(self) -> None: from .renderer.sub_viewport import SubViewportManager from .scene_adapter import SceneAdapter app = self.app engine = self.engine renderer = engine.renderer self.adapter = SceneAdapter(engine, renderer) app._scene_adapter = self.adapter # type: ignore[attr-defined] # Exposed for editor game viewport app._sub_viewports = SubViewportManager(engine, self.adapter) self.tree._app = app engine._scene_tree = self.tree # type: ignore[attr-defined] engine._scene_adapter = self.adapter # type: ignore[attr-defined] # for ReflectionProbePass # After a device-loss rebuild the engine creates a NEW Renderer; re-point the # adapter + sub-viewports at it (else they keep submitting to the dead one). engine._on_device_rebuilt = self._rebuild_scene_adapter # type: ignore[attr-defined] self.tree.set_root(self.root_node) self.pipelined = app._resolve_pipelined(self.tree) if self.pipelined: self._start_render_thread() # Let a device-loss / healthy rebuild driven inside the engine quiesce # and respawn the render thread on the quiescent main thread (R4). engine._render_thread_controller = _RenderThreadController(self) self.integration.on_setup(self) self.clock.on_begin(app) # ---------------------------------------------------------------- frame def _clock_phase(self, _root: Any, frame_dt: float, phase: str) -> None: """Drive the clock's physics / logic step for the given scene phase. The clock keeps owning its dt/accumulator and physics-stepping policy; this adapter only lets ``step_scene_logic`` order the shared sequence. """ if phase == "physics": self.clock.step_physics(self.app, self.tree, frame_dt) else: self.clock.tick_logic(self.app, self.tree, frame_dt) def _update(self) -> None: engine = self.engine # Capture the frame drawn by the previous GPU frame (synchronous path; in # pipelined mode the render thread captures via render_thread_capture()). if self.frame_idx > 0 and not self.pipelined: self.sink.capture_synchronous(self, self.frame_idx - 1) if self.clock.should_stop(self.frame_idx): self._drain_pipeline() self.integration.on_loop_end(self) engine._running = False return if not self.integration.pre_tick(self): if self.frame_idx > 0: self._drain_pipeline() self.integration.on_loop_end(self) engine._running = False return from simvx.core import Input, ScenePhases, step_scene_logic from .draw2d import Draw2D frame_dt = self.clock.advance(self.app) # The physics->mid_tick->logic ORDER is the shared canonical sequence # (simvx.core.scene_step). The clock keeps owning its dt/accumulator and # physics-stepping policy, and mid_tick (gamepad + window-size sync) keeps # its between-physics-and-logic slot; both are passed in as phase hooks so # the phase order is byte-identical to the previous inline form. step_scene_logic( self.tree, frame_dt, phases=ScenePhases( node_walk=self._clock_phase, ui_events=lambda: self.integration.mid_tick(self), ), ) Draw2D._reset() self.tree.render(Draw2D) self.integration.post_tick(self) # Post-tick barrier: tree fully ticked + Draw2D built, GPU handoff not started. # External requests (PEP 768 socket bridge, editor tooling) are drained here on # the loop thread so all SceneTree mutation stays single-threaded. No cost when # nothing is registered. if self.services: self.services.run(self) if not self.pipelined: engine.renderer.begin_frame() self.adapter.submit_scene(self.tree) # 2D item pipeline (design §2.6, P1.6): publish the item view and bind # it for ``render`` to submit. item_view, item_camera = self._publish_item_view() engine.renderer.set_item_view(item_view, item_camera) Input._end_frame() Input._new_frame() self.integration.per_frame_telemetry(self) self.frame_idx += 1 def _drain_pipeline(self) -> None: if self.pipelined and self._render_thread is not None and self.frame_idx > 0: self._render_thread.wait_for_frame(self.frame_idx - 1) def _capture_telemetry(self) -> None: app = self.app if self.pipelined and self._render_thread is not None: with self.engine.renderer._frame_state_lock: app._populate_telemetry(frames_rendered=self.frame_idx, occlusion_wait=True) else: app._populate_telemetry(frames_rendered=self.frame_idx, occlusion_wait=True) # ---------------------------------------------------------------- gpu hooks def _publish_item_view(self) -> tuple[Any, tuple]: """Collect + publish the 2D item view for this frame (design §2.6). Returns ``(view, camera_affine)``. Lazily creates the per-scene ``RenderItemCache`` + ``ItemPublisher`` on the first frame. The cache (design §2.7) skips a clean frame, applies an in-place item/transform patch when only some drawables are dirty (P2 auto-invalidation via the Drawable2D render bits), or re-collects on a structure / camera / theme change. The publisher republishes the SAME frozen view (zero-copy) on a clean frame, and re-freezes when the cache ``epoch`` advances (a patch mutates the columns in place but keeps the result object's identity) -- so a patched frame reaches the render thread. """ from .render2d import ( ItemPublisher, RenderItemCache, ViewState, camera_affine_from_tree, ) if self._item_cache is None: from simvx.core.ui.theme import theme_generation self._item_cache = RenderItemCache(theme_generation=theme_generation) self._item_publisher = ItemPublisher() cam = getattr(self.tree, "_current_camera_2d", None) if cam is not None: z = float(cam.zoom) if cam.zoom > 0 else 1.0 view_state = ViewState( offset=(float(cam.current[0]), float(cam.current[1])), zoom=(z, z), rotation=0.0, viewport=tuple(int(s) for s in self.tree._screen_size), ) else: view_state = ViewState(viewport=tuple(int(s) for s in self.tree._screen_size)) struct_v = int(getattr(self.tree, "_structure_version", 0)) result = self._item_cache.frame(self.tree.root, structure_version=struct_v, view=view_state) # Pass the cache epoch so an in-place patch (same result object, advanced # epoch) re-freezes the published view; a clean frame (same epoch) reuses # it verbatim (zero copy). view = self._item_publisher.publish(result, epoch=self._item_cache.epoch) return view, camera_affine_from_tree(self.tree) def _frame_driver(self) -> None: from .renderer.render_packet import extract_render_packet engine = self.engine if not self.pipelined or self._render_ring is None or self._render_thread is None: # Pipelining off: synchronous GPU frame on the main thread (begin_frame + # submit_scene already ran in _update), byte-identical to the default path. engine._draw_frame() return # Pipelined device-loss recovery (R4): the render thread caught # VK_ERROR_DEVICE_LOST, flagged the engine, and exited (it cannot rebuild # the device it records on). Drive the rebuild HERE on the quiescent main # thread; ``_recover_device`` -> ``_rebuild_device`` quiesces + respawns the # render thread via the controller. Skip handing off this frame's packet # (its lists targeted the dead renderer); the next frame re-submits. if engine._device_lost: if not engine._recover_device(): engine._running = False return renderer = engine.renderer with renderer._frame_state_lock: renderer.begin_frame() self.adapter.submit_scene(self.tree) self.app._warn_pipelined_unpacketised(self.tree) # Publish the 2D item view inside the sync point (design §4): the # publisher freezes the immutable view the render thread reads. item_view, item_camera = self._publish_item_view() packet = extract_render_packet( renderer, self.tree, frame_index=self._packet_idx, sub_viewports=self.app._sub_viewports, item_view=item_view, item_camera=item_camera, ) try: self._render_ring.submit(packet) except RuntimeError: engine._running = False return self._packet_idx += 1 if not self._render_thread.alive: engine._running = False def _pre_render_fn(self, cmd: Any) -> None: self.integration.pre_render(self, cmd) def _default_pre_render(self, cmd: Any) -> None: """Reserve the main slice, render SubViewports/probes (sync) or replay (pipelined).""" app = self.app renderer = self.engine.renderer renderer.reserve_main_slice() if not self.pipelined: if app._sub_viewports is not None and self.adapter is not None: app._sub_viewports.render_all(cmd, self.tree) if self.adapter is not None: renderer.capture_reflection_probes(cmd, self.adapter) elif self.adapter is not None: for sru in renderer._packet_subviewport_srus or []: self.adapter.render_sru_from_plan(cmd, sru) renderer.pre_render(cmd) def _render_fn(self, cmd: Any, extent: Any) -> None: self.engine.renderer.render(cmd) def _cleanup(self) -> None: if self.app._sub_viewports is not None: self.app._sub_viewports.destroy() def _rebuild_scene_adapter(self) -> None: """Re-point the SceneAdapter + SubViewportManager at the rebuilt renderer. Installed as ``engine._on_device_rebuilt``; runs on the main thread inside ``Engine._rebuild_device`` after the new Renderer is created. SceneAdapter holds a hard renderer reference (``scene_adapter.py:80``), so a device rebuild that makes a fresh Renderer would otherwise leave the adapter feeding the dead one (blank post-rebuild frames). Recreating it fresh mirrors first-time ``_setup``. """ from .renderer.sub_viewport import SubViewportManager from .scene_adapter import SceneAdapter app = self.app engine = self.engine self.adapter = SceneAdapter(engine, engine.renderer) app._scene_adapter = self.adapter # type: ignore[attr-defined] engine._scene_adapter = self.adapter # type: ignore[attr-defined] app._sub_viewports = SubViewportManager(engine, self.adapter) # type: ignore[attr-defined] def _start_render_thread(self) -> None: """Create + start a fresh packet ring and render thread on the live device. A Python thread is single-use, so the render thread is RECREATED (not restarted) here -- used both at first setup and after a device rebuild. The new thread re-establishes the command-pool + single-recorder invariant on the rebuilt device. """ from .renderer.render_packet import RenderPacketRing from .renderer.render_thread import RenderThread engine = self.engine self._render_ring = RenderPacketRing(capacity=self.sink.ring_capacity) self._render_thread = RenderThread( engine, engine.renderer, self._render_ring, capture=self.sink.render_thread_capture() ) # Packet indices restart at 0 on the fresh ring/thread so headless capture # frame-index sequencing stays consistent with the new thread's counter. self._packet_idx = 0 self._render_thread.start() def _stop_render_thread(self) -> None: if self._render_thread is not None: self._render_thread.stop() self._render_thread = None self._render_ring = None