"""FrameLoop: the single per-frame engine drive seam (clock + sink + step).
Historically ``App._run_with_tree`` (visible), ``App.run_headless``,
``App.run_streaming``, and the editor's PlayMode each hand-rolled the same
``physics -> interpolate -> tick -> Draw2D -> submit_scene -> present/capture``
core plus an identical pipelined ``frame_driver``. FrameLoop owns that core once;
the per-loop differences are expressed as three pluggable pieces:
- **Clock**: how wall time advances and the physics-stepping strategy
(``RealTimeClock`` accumulator vs ``FixedStepClock`` single-step).
- **Sink**: where the rendered frame goes and how capture is wired
(``SwapchainSink`` present, ``OffscreenSink`` readback, ``StreamSink`` push).
- **Integration**: input source + windowed extras (callbacks, gamepad, resize,
picking) -- supplied per loop.
The LLM agent live-session and the editor viewport are just ``FixedStepClock``
(externally driven) + an offscreen or swapchain sink. This module is migrated
one consumer at a time; ``run_headless`` is the first to land on it.
"""
from __future__ import annotations
import logging
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from .app import App
from .engine import Engine
log = logging.getLogger(__name__)
class _RenderThreadController:
"""Lets ``Engine._rebuild_device`` quiesce + respawn FrameLoop's render thread.
Installed on the engine only when running pipelined. ``stop`` closes the packet
ring and joins the thread (so no stale packet referencing dead handles is
recorded against the device being destroyed); ``start`` spins up a fresh ring +
thread on the rebuilt device. Keeping this on the loop side means the engine
never imports the render-thread machinery for a synchronous run.
"""
def __init__(self, loop: FrameLoop) -> None:
self._loop = loop
def stop(self) -> None:
self._loop._stop_render_thread()
def start(self) -> None:
self._loop._start_render_thread()
#: Loops that have built their engine + tree and not yet torn down. Lets an external
#: introspector (e.g. a PEP 768 injected bootstrap) discover the running loop in-process
#: without the game cooperating. Pure stdlib; insertion-ordered (most recent last).
_active_loops: list[FrameLoop] = []
[docs]
def active_loops() -> list[FrameLoop]:
"""Snapshot of currently-running :class:`FrameLoop` instances (most recent last)."""
return list(_active_loops)
# ============================================================================
# Clock -- time advance + physics stepping strategy
# ============================================================================
[docs]
class FrameClock:
"""Decides per-frame dt and how physics/logic advance."""
[docs]
def on_begin(self, app: App) -> None: # noqa: D401 - hook
"""Called once after setup, before the first frame."""
[docs]
def should_stop(self, frame_idx: int) -> bool:
return False
[docs]
def advance(self, app: App) -> float:
raise NotImplementedError
[docs]
def step_physics(self, app: App, tree: Any, frame_dt: float) -> None:
raise NotImplementedError
[docs]
def tick_logic(self, app: App, tree: Any, frame_dt: float) -> None:
tree.tick(frame_dt * app._time_scale)
[docs]
class FixedStepClock(FrameClock):
"""Fixed timestep: one physics step per frame, interpolate at the step boundary.
With ``frames`` set this is the headless batch clock; with ``frames=None`` and
an explicit ``dt`` it is the externally-driven clock the agent / editor use
(the caller advances frames, no internal cap).
"""
def __init__(self, *, frames: int | None = None, dt: float | None = None) -> None:
self.frames = frames
self._dt = dt
[docs]
def should_stop(self, frame_idx: int) -> bool:
return self.frames is not None and frame_idx >= self.frames
[docs]
def advance(self, app: App) -> float:
return self._dt if self._dt is not None else 1.0 / app._physics_fps
[docs]
def step_physics(self, app: App, tree: Any, frame_dt: float) -> None:
scaled = frame_dt * app._time_scale
tree.physics_tick(scaled)
# alpha=1 -> current pose; headless captures at the step boundary so no
# real interpolation is needed (byte-identical to the immediate write).
tree.interpolate_physics(1.0)
[docs]
def tick_logic(self, app: App, tree: Any, frame_dt: float) -> None:
tree.tick(frame_dt * app._time_scale)
[docs]
class RealTimeClock(FrameClock):
"""Wall-clock dt with a fixed-timestep physics accumulator (windowed / streaming)."""
def __init__(self) -> None:
self._last = 0.0
self._accum = 0.0
[docs]
def on_begin(self, app: App) -> None:
import time
self._last = time.perf_counter()
[docs]
def advance(self, app: App) -> float:
import time
now = time.perf_counter()
frame_dt = min(now - self._last, 0.1) # cap at 100ms
self._last = now
return frame_dt
[docs]
def step_physics(self, app: App, tree: Any, frame_dt: float) -> None:
physics_dt = 1.0 / app._physics_fps
self._accum += frame_dt
scaled_physics_dt = physics_dt * app._time_scale
while self._accum >= physics_dt:
tree.physics_tick(scaled_physics_dt)
self._accum -= physics_dt
tree.interpolate_physics(self._accum / physics_dt)
# ============================================================================
# Sink -- where the rendered frame goes + capture wiring
# ============================================================================
[docs]
class FrameSink:
"""Where the frame goes; supplies the render-thread capture + sync readback."""
#: RenderPacketRing capacity: 2 overlaps frames (windowed/stream), 1 lock-steps
#: the producer with the render thread so per-frame capture is exact (headless).
ring_capacity: int = 2
[docs]
def render_thread_capture(self) -> Callable[[int, Any], None] | None:
"""Callback the pipelined RenderThread runs after present (or None)."""
return None
[docs]
def capture_synchronous(self, loop: FrameLoop, frame_idx: int) -> None:
"""Synchronous-mode readback of the just-drawn frame (no-op by default)."""
[docs]
class SwapchainSink(FrameSink):
"""Present to the window; no readback. Windowed games."""
ring_capacity = 2
[docs]
class OffscreenSink(FrameSink):
"""Read frames back into ``captured`` (headless tests, agent capture)."""
ring_capacity = 1
def __init__(self, *, capture_frames: Sequence[int] | None = (), capture_fn: Callable[[int], bool] | None = None):
self.capture_frames = capture_frames
self.capture_fn = capture_fn
self.captured: list[Any] = []
def _should_capture(self, idx: int) -> bool:
if self.capture_fn is not None:
return bool(self.capture_fn(idx))
return self.capture_frames is None or idx in self.capture_frames
[docs]
def render_thread_capture(self) -> Callable[[int, Any], None]:
def _capture(idx: int, rgba: Any) -> None:
# Runs on the render thread after present; ordered by frame index.
if self._should_capture(idx):
self.captured.append(rgba)
return _capture
[docs]
def capture_synchronous(self, loop: FrameLoop, frame_idx: int) -> None:
if self._should_capture(frame_idx):
self.captured.append(loop.engine.renderer.capture_frame())
[docs]
class StreamSink(FrameSink):
"""Push each drawn frame to connected browser clients over WebSocket."""
ring_capacity = 2
def __init__(self, server: Any) -> None:
self.server = server
[docs]
def render_thread_capture(self) -> Callable[[int, Any], None]:
server = self.server
def _capture(_idx: int, rgba: Any) -> None:
# Runs on the render thread after present.
if server.has_clients():
try:
server.push_frame(rgba)
except (RuntimeError, AttributeError) as exc:
log.debug("Streaming frame push skipped: %s", exc, exc_info=True)
return _capture
[docs]
def capture_synchronous(self, loop: FrameLoop, frame_idx: int) -> None:
if not self.server.has_clients():
return
import vulkan as _vk # type: ignore[import-untyped]
try:
pixels = loop.engine.renderer.capture_frame()
self.server.push_frame(pixels)
except (_vk.VkErrorOutOfDateKhr, _vk.VkErrorSurfaceLostKhr, RuntimeError, AttributeError) as exc:
# A swapchain mid-resize/teardown or a renderer being torn down makes a
# single capture transiently fail: streaming skips this frame.
log.debug("Frame capture for streaming skipped: %s", exc, exc_info=True)
# ============================================================================
# Integration -- input source + windowed extras
# ============================================================================
[docs]
class FrameIntegration:
"""Per-loop input source and windowed extras."""
#: Reset the global Input/Draw2D state at begin (re-runnable headless paths).
reset_globals: bool = False
[docs]
def on_run_begin(self, loop: FrameLoop) -> None:
"""After the engine + tree exist, before the loop (e.g. start a stream server)."""
[docs]
def on_run_end(self, loop: FrameLoop) -> None:
"""In the run() finally, before audio shutdown (e.g. stop a stream server)."""
[docs]
def on_setup(self, loop: FrameLoop) -> None:
"""Extra setup after the adapter exists (input callbacks, clipboard)."""
[docs]
def pre_tick(self, loop: FrameLoop) -> bool:
"""Input drain / window sync before the tick. Return False to stop the loop."""
return True
[docs]
def mid_tick(self, loop: FrameLoop) -> None:
"""Between physics and logic tick (theme sync on the windowed path)."""
[docs]
def post_tick(self, loop: FrameLoop) -> None:
"""After Draw2D submit (mouse picking on the windowed path)."""
[docs]
def per_frame_telemetry(self, loop: FrameLoop) -> None:
"""Live per-frame telemetry (windowed path)."""
[docs]
def on_loop_end(self, loop: FrameLoop) -> None:
"""At the stop boundary (headless captures final telemetry here)."""
[docs]
def pre_render(self, loop: FrameLoop, cmd: Any) -> None:
"""Offscreen passes before the main render pass (SubViewports, probes)."""
loop._default_pre_render(cmd)
[docs]
class HeadlessIntegration(FrameIntegration):
"""Batch headless: InputSimulator drives Input; capture + telemetry at the end."""
reset_globals = True
def __init__(self, on_frame: Callable[[int, float], bool | None] | None = None) -> None:
self.on_frame = on_frame
[docs]
def pre_tick(self, loop: FrameLoop) -> bool:
if self.on_frame is None:
return True
physics_dt = 1.0 / loop.app._physics_fps
return self.on_frame(loop.frame_idx, loop.frame_idx * physics_dt) is not False
[docs]
def on_loop_end(self, loop: FrameLoop) -> None:
loop._capture_telemetry()
[docs]
class StreamingIntegration(FrameIntegration):
"""Browser streaming: drain queued client input, push readback to clients."""
def __init__(self, server: Any) -> None:
self.server = server
self._last_mouse_pos = (0.0, 0.0)
self._primary_finger: int | None = None
[docs]
def on_run_begin(self, loop: FrameLoop) -> None:
self.server.start(loop.app.width, loop.app.height)
[docs]
def on_run_end(self, loop: FrameLoop) -> None:
self.server.stop()
[docs]
def pre_tick(self, loop: FrameLoop) -> bool:
self._process_input(loop.tree)
return True
[docs]
def pre_render(self, loop: FrameLoop, cmd: Any) -> None:
renderer = loop.engine.renderer
if not loop.pipelined:
# SYNCHRONOUS path: byte-identical to prior streaming behaviour (main scene
# only; _upload_transforms self-reserves its base-0 slice). SubViewport /
# probe handling was never wired into synchronous streaming and stays out.
renderer.pre_render(cmd)
return
# PIPELINED path (RENDER THREAD): reserve main slice, replay SubViewport SRUs.
renderer.reserve_main_slice()
if loop.adapter is not None:
for sru in renderer._packet_subviewport_srus or []:
loop.adapter.render_sru_from_plan(cmd, sru)
renderer.pre_render(cmd)
def _process_input(self, tree: Any) -> None:
"""Drain queued input events from the server and route to Input + UI."""
from simvx.core import MouseButton
from simvx.core.input import Input
from .input_adapter import _KEY_MAP
for evt in self.server.drain_input():
etype = evt.get("type")
if etype == "key":
code = evt.get("code", 0)
pressed = evt.get("pressed", False)
Input._on_key(code, pressed)
key_name = _KEY_MAP.get(code)
if key_name:
if pressed:
if not Input._keys.get(key_name):
Input._keys_just_pressed[key_name] = True
Input._keys[key_name] = True
else:
Input._keys[key_name] = False
Input._keys_just_released[key_name] = True
tree.ui_input(key=key_name, pressed=pressed)
elif etype == "char":
codepoint = evt.get("codepoint", 0)
tree.ui_input(char=chr(codepoint))
elif etype == "mouse":
button = evt.get("button", 0)
pressed = evt.get("pressed", False)
Input._on_mouse_button(button, pressed)
btn = f"mouse_{button + 1}"
if pressed:
if not Input._keys.get(btn):
Input._keys_just_pressed[btn] = True
Input._keys[btn] = True
else:
Input._keys[btn] = False
Input._keys_just_released[btn] = True
tree.ui_input(mouse_pos=Input._mouse_pos, button=MouseButton(button), pressed=pressed)
elif etype == "mousemove":
x, y = evt.get("x", 0.0), evt.get("y", 0.0)
old = Input._mouse_pos
Input._mouse_pos = (x, y)
Input._mouse_delta = (x - old[0], y - old[1])
pos = (x, y)
if pos != self._last_mouse_pos:
self._last_mouse_pos = pos
tree.ui_input(mouse_pos=pos, button=None, pressed=False)
elif etype == "scroll":
dx, dy = evt.get("dx", 0.0), evt.get("dy", 0.0)
Input._scroll_delta = (Input._scroll_delta[0] + dx, Input._scroll_delta[1] + dy)
if dy > 0:
tree.ui_input(key="scroll_up", pressed=True)
elif dy < 0:
tree.ui_input(key="scroll_down", pressed=True)
elif etype == "touch":
finger_id = evt.get("id", 0)
action = evt.get("action", 0)
x, y = evt.get("x", 0.0), evt.get("y", 0.0)
pressure = evt.get("pressure", 1.0)
Input._update_touch(finger_id, action, x, y, pressure)
# Primary finger emulates mouse for UI.
if action == 0 and self._primary_finger is None:
self._primary_finger = finger_id
Input._mouse_pos = (x, y)
Input._mouse_delta = (0.0, 0.0)
tree.ui_input(mouse_pos=(x, y), button=None, pressed=False)
tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=True)
elif action == 1 and finger_id == self._primary_finger:
self._primary_finger = None
Input._mouse_pos = (x, y)
tree.ui_input(mouse_pos=(x, y), button=None, pressed=False)
tree.ui_input(mouse_pos=(x, y), button=MouseButton.LEFT, pressed=False)
elif action == 2 and finger_id == self._primary_finger:
old = Input._mouse_pos
Input._mouse_pos = (x, y)
Input._mouse_delta = (x - old[0], y - old[1])
tree.ui_input(mouse_pos=(x, y), button=None, pressed=False)
[docs]
class WindowedIntegration(FrameIntegration):
"""Visible game window: platform input callbacks, gamepad, resize, picking.
Gamepad poll / window-resize / theme sync run in ``mid_tick`` (after physics,
before the logic tick) to match the original ``_run_with_tree`` ordering.
"""
def __init__(self) -> None:
self._last_mouse_pos = (0.0, 0.0)
self._last_theme_gen = -1
self._theme_driven_bg = False
[docs]
def on_run_end(self, loop: FrameLoop) -> None:
# Drop the capture callback so a second in-process app run can't invoke a destroyed window.
from simvx.core.input import Input
Input._capture_mode_callback = None
[docs]
def on_setup(self, loop: FrameLoop) -> None:
from .input_adapter import (
char_callback,
cursor_pos_callback,
key_callback_with_ui,
mouse_button_callback_with_ui,
scroll_callback,
set_ui_callbacks,
)
app = loop.app
engine = loop.engine
tree = loop.tree
bg = app._bg_colour
self._theme_driven_bg = not (bg == "transparent" or isinstance(bg, tuple | list))
def _ui_char(ch: str) -> None:
tree.ui_input(char=ch)
def _ui_key(key_name: str, pressed: bool) -> None:
tree.ui_input(key=key_name, pressed=pressed)
def _ui_mouse(button: Any, pressed: bool) -> None:
from simvx.core.input import Input
tree.ui_input(mouse_pos=Input._mouse_pos, button=button, pressed=pressed)
def _ui_motion(pos: tuple[float, float]) -> None:
if pos != self._last_mouse_pos:
self._last_mouse_pos = pos
tree.ui_input(mouse_pos=pos, button=None, pressed=False)
def _ui_touch(finger_id: int, action: int, x: float, y: float) -> None:
tree.touch_input(finger_id, action, x, y)
set_ui_callbacks(
char=_ui_char,
key=_ui_key,
mouse=_ui_mouse,
motion=_ui_motion,
scroll=_ui_key, # scroll events use same key routing
touch=_ui_touch,
tree_propagate=tree.propagate_input,
)
engine.set_key_callback(key_callback_with_ui)
engine.set_mouse_button_callback(mouse_button_callback_with_ui)
engine.set_cursor_pos_callback(cursor_pos_callback)
engine.set_scroll_callback(scroll_callback)
engine.set_char_callback(char_callback)
# Wire mouse capture to the OS cursor: apply any mode set before the window existed,
# then route future Input.set_mouse_capture_mode() calls straight to the backend.
from simvx.core.input import Input
engine.set_mouse_capture(int(Input.get_mouse_capture_mode()))
Input._capture_mode_callback = lambda mode: engine.set_mouse_capture(int(mode))
win = engine._window
if hasattr(win, "set_touch_callback"):
from .input_adapter import touch_callback
win.set_touch_callback(touch_callback)
self._wire_clipboard(engine)
# Store HiDPI content scale on theme (do NOT scale Draw2D base height).
sx, _sy = engine.content_scale
if sx > 1.01:
from simvx.core.ui.theme import get_theme
get_theme().ui_scale = sx
get_theme()._sync_dicts()
tree._platform_window = getattr(engine._window, "_window", None)
def _wire_clipboard(self, engine: Any) -> None:
try:
from simvx.core.ui.clipboard import set_backend as _set_clipboard
from .app import _get_sdl3_type
win = engine._window
if isinstance(win, _get_sdl3_type()):
import sdl3 as _sdl # type: ignore[import-not-found]
def _sdl3_paste() -> str:
val = _sdl.SDL_GetClipboardText()
if val is None:
return ""
return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val)
_set_clipboard(lambda text: _sdl.SDL_SetClipboardText(text.encode()), _sdl3_paste)
else:
import glfw # type: ignore[import-untyped]
_win = win._window
def _glfw_paste() -> str:
val = glfw.get_clipboard_string(_win)
if val is None:
return ""
return val.decode("utf-8", errors="replace") if isinstance(val, bytes) else str(val)
_set_clipboard(lambda text: glfw.set_clipboard_string(_win, text), _glfw_paste)
except (ImportError, AttributeError) as exc:
log.debug("Clipboard backend unavailable; using no-op clipboard: %s", exc, exc_info=True)
[docs]
def mid_tick(self, loop: FrameLoop) -> None:
from simvx.core.input import Input
from .input_adapter import poll_gamepads
engine = loop.engine
tree = loop.tree
poll_gamepads(engine._window)
if engine._window:
ws = engine._window.get_window_size()
if tree.screen_size != (ws[0], ws[1]):
tree.screen_size = (ws[0], ws[1])
if hasattr(engine._window, "get_cursor_pos"):
cx, cy = engine._window.get_cursor_pos()
Input._mouse_pos = (cx, cy)
if self._theme_driven_bg:
from simvx.core.ui.theme import get_theme, theme_generation
gen = theme_generation()
if gen != self._last_theme_gen:
self._last_theme_gen = gen
c = get_theme().bg_black
engine.clear_colour = [c[0], c[1], c[2], c[3]] # type: ignore[attr-defined]
[docs]
def post_tick(self, loop: FrameLoop) -> None:
from simvx.core import MouseButton
from simvx.core.input import Input
if Input._keys_just_pressed.get("mouse_1"):
loop.tree.input_cast(Input._mouse_pos, button=MouseButton.LEFT)
[docs]
def per_frame_telemetry(self, loop: FrameLoop) -> None:
if not loop.pipelined:
loop.app._populate_telemetry(frames_rendered=loop.frame_idx)
# ============================================================================
# FrameServices -- per-loop external request drain at the frame barrier
# ============================================================================
[docs]
class FrameService:
"""A unit of work drained once per frame on the loop (main) thread.
The contract is the marshalling pattern external drivers (PEP 768 socket
bridge, editor tooling) need: a producer thread enqueues work, and this
runs on the one thread that owns the SceneTree, AT the post-tick barrier
(tree fully ticked + Draw2D built, GPU handoff not yet started). Implement
:meth:`on_frame_service`; keep it bounded and non-blocking.
"""
[docs]
def on_frame_service(self, loop: FrameLoop) -> None: # noqa: D401 - hook
"""Drain queued work on the loop thread. Must not block the frame."""
[docs]
def on_loop_end(self, loop: FrameLoop) -> None: # noqa: D401 - hook
"""Release resources when the owning loop tears down (close sockets, join threads)."""
[docs]
class FrameServices:
"""Ordered registry of :class:`FrameService` objects drained each frame.
Loop-level (not integration-level) so every clock/sink/integration combo --
windowed, headless, streaming, and the externally-driven agent/editor path --
reaches the same barrier through ``FrameLoop._update`` with zero per-consumer
wiring. Registration is the canonical way to attach a per-frame drain to a
*running* loop; it is dependency-free (pure stdlib list).
"""
def __init__(self) -> None:
self._services: list[FrameService] = []
[docs]
def add(self, service: FrameService) -> FrameService:
"""Register a service. Returns it for ``svc = loop.services.add(MyService())``."""
self._services.append(service)
return service
[docs]
def remove(self, service: FrameService) -> None:
"""Deregister a service (idempotent)."""
if service in self._services:
self._services.remove(service)
[docs]
def __bool__(self) -> bool:
return bool(self._services)
[docs]
def run(self, loop: FrameLoop) -> None:
"""Drain every registered service on the loop thread, in registration order.
A failing service is fully logged and the remaining services still run; the
loop must never be left half-drained or silently killed by one bad service.
"""
for service in self._services:
try:
service.on_frame_service(loop)
except Exception:
log.exception("FrameService %r raised during frame drain", service)
[docs]
def close(self, loop: FrameLoop) -> None:
"""Tell every service the loop is ending, then clear the registry.
Called from FrameLoop._teardown so a registered bridge (its accept thread +
bound socket) is shut down with the loop instead of leaking until process exit.
"""
for service in self._services:
try:
service.on_loop_end(loop)
except Exception:
log.exception("FrameService %r raised during loop-end close", service)
self._services.clear()
# ============================================================================
# FrameLoop -- owns the engine lifecycle + the one per-frame core
# ============================================================================
[docs]
class FrameLoop:
"""One per-frame engine drive seam, parameterised by clock + sink + integration."""
#: Set by run() before any frame method executes.
engine: Engine
def __init__(
self,
app: App,
root_node: Any,
*,
clock: FrameClock,
sink: FrameSink,
integration: FrameIntegration,
visible: bool,
vsync: bool,
) -> None:
self.app = app
self.root_node = root_node
self.clock = clock
self.sink = sink
self.integration = integration
self._visible = visible
self._vsync = vsync
#: Per-frame external request drain, serviced at the post-tick barrier. Empty
#: (zero cost) unless a consumer registers a FrameService onto the running loop.
self.services = FrameServices()
self.tree: Any = None
self.adapter: Any = None
self.pipelined = False
self.frame_idx = 0
self._render_ring: Any = None
self._render_thread: Any = None
self._packet_idx = 0
# 2D item-pipeline (design §2.6, P1.6): the only 2D renderer. Lazily
# created the first frame: one RenderItemCache + ItemPublisher for the
# main scene.
self._item_cache: Any = None
self._item_publisher: Any = None
self._item_structure_version = -1
# ---------------------------------------------------------------- run
def _build(self) -> None:
"""Construct the engine + tree (shared by run() and begin())."""
from simvx.core import Input, SceneTree
from .draw2d import Draw2D
from .engine import Engine
app = self.app
if self.integration.reset_globals:
app._engine = None
app._scene_adapter = None # type: ignore[attr-defined]
Input._reset()
Draw2D._reset()
engine = Engine(
width=app.width,
height=app.height,
title=app.title,
backend=app._backend_name,
visible=self._visible,
vsync=self._vsync,
target_fps=app._target_fps if self._visible else None,
)
self.engine = engine
app._engine = engine
engine._multi_gpu_requested = app._multi_gpu
self._apply_clear_colour()
self.tree = SceneTree(screen_size=(app.width, app.height))
app._init_audio_backend(self.tree)
app.last_telemetry = {}
self.integration.on_run_begin(self)
if self not in _active_loops:
_active_loops.append(self)
def _teardown(self) -> None:
"""Mirror of _build (shared by run() finally and end())."""
if self in _active_loops:
_active_loops.remove(self)
# Shut down any registered services (e.g. the agent socket bridge) so their
# threads/sockets do not outlive the loop.
self.services.close(self)
self.integration.on_run_end(self)
self.app._sub_viewports = None
self.app._shutdown_audio(self.tree)
[docs]
def run(self) -> None:
"""Build the engine and drive frames (blocking) until the loop stops."""
self._build()
try:
self.engine.run(
callback=self._update,
setup=self._setup,
render=self._render_fn,
pre_render=self._pre_render_fn,
cleanup=self._cleanup,
pre_shutdown=self._stop_render_thread,
frame_driver=self._frame_driver,
)
finally:
self._teardown()
# ----------------------------------------------------- external driving
[docs]
def begin(self) -> None:
"""Externally-driven setup: build the engine, create the window + device, run
setup. Pair with :meth:`step_frame` / :meth:`capture` / :meth:`end`. For an
External (FixedStepClock) clock -- the agent live-session and editor viewport."""
self._build()
self.engine.begin(setup=self._setup, render=self._render_fn, pre_render=self._pre_render_fn)
[docs]
def step_frame(self, dt: float | None = None) -> bool:
"""Advance exactly one frame; returns False when the loop should stop."""
if dt is not None and isinstance(self.clock, FixedStepClock):
self.clock._dt = dt
return self.engine.step(callback=self._update, frame_driver=self._frame_driver)
[docs]
def capture(self, *, scale: float = 1.0, region: tuple[int, int, int, int] | None = None) -> Any:
"""Read back the last drawn frame as an (H, W, 4) uint8 RGBA array.
Optional ``region=[x1,y1,x2,y2]`` crops and ``scale`` (<1.0) nearest-downsamples
at the source -- the cheap detail/cost levers from the design.
"""
import numpy as np
# When pipelined, the render thread shares the graphics queue + command pool and
# guards them with renderer._frame_state_lock (see render_thread). A capture from
# the loop thread (e.g. the agent bridge draining at the post-tick barrier) must
# take that same lock or it races the in-flight present. Synchronous mode has no
# render thread, so no lock is needed.
if self.pipelined:
with self.engine.renderer._frame_state_lock:
img = self.engine.renderer.capture_frame()
else:
img = self.engine.renderer.capture_frame()
if region is not None:
x1, y1, x2, y2 = region
img = img[y1:y2, x1:x2]
if scale != 1.0 and img.size:
h, w = img.shape[:2]
nh, nw = max(1, int(h * scale)), max(1, int(w * scale))
ys = np.minimum((np.arange(nh) / scale).astype(int), h - 1)
xs = np.minimum((np.arange(nw) / scale).astype(int), w - 1)
img = img[ys][:, xs]
return img
[docs]
def end(self) -> None:
"""Externally-driven teardown (counterpart to :meth:`begin`)."""
try:
self.engine.end(pre_shutdown=self._stop_render_thread, cleanup=self._cleanup)
finally:
self._teardown()
def _apply_clear_colour(self) -> None:
app = self.app
if app._bg_colour == "transparent":
self.engine.clear_colour = [0.0, 0.0, 0.0, 0.0] # type: ignore[attr-defined]
elif isinstance(app._bg_colour, tuple | list):
self.engine.clear_colour = list(app._bg_colour) # type: ignore[attr-defined]
else:
from simvx.core.ui.theme import get_theme
c = get_theme().bg_black
self.engine.clear_colour = [c[0], c[1], c[2], c[3]] # type: ignore[attr-defined]
# ---------------------------------------------------------------- setup
def _setup(self) -> None:
from .renderer.sub_viewport import SubViewportManager
from .scene_adapter import SceneAdapter
app = self.app
engine = self.engine
renderer = engine.renderer
self.adapter = SceneAdapter(engine, renderer)
app._scene_adapter = self.adapter # type: ignore[attr-defined] # Exposed for editor game viewport
app._sub_viewports = SubViewportManager(engine, self.adapter)
self.tree._app = app
engine._scene_tree = self.tree # type: ignore[attr-defined]
engine._scene_adapter = self.adapter # type: ignore[attr-defined] # for ReflectionProbePass
# After a device-loss rebuild the engine creates a NEW Renderer; re-point the
# adapter + sub-viewports at it (else they keep submitting to the dead one).
engine._on_device_rebuilt = self._rebuild_scene_adapter # type: ignore[attr-defined]
self.tree.set_root(self.root_node)
self.pipelined = app._resolve_pipelined(self.tree)
if self.pipelined:
self._start_render_thread()
# Let a device-loss / healthy rebuild driven inside the engine quiesce
# and respawn the render thread on the quiescent main thread (R4).
engine._render_thread_controller = _RenderThreadController(self)
self.integration.on_setup(self)
self.clock.on_begin(app)
# ---------------------------------------------------------------- frame
def _clock_phase(self, _root: Any, frame_dt: float, phase: str) -> None:
"""Drive the clock's physics / logic step for the given scene phase.
The clock keeps owning its dt/accumulator and physics-stepping policy;
this adapter only lets ``step_scene_logic`` order the shared sequence.
"""
if phase == "physics":
self.clock.step_physics(self.app, self.tree, frame_dt)
else:
self.clock.tick_logic(self.app, self.tree, frame_dt)
def _update(self) -> None:
engine = self.engine
# Capture the frame drawn by the previous GPU frame (synchronous path; in
# pipelined mode the render thread captures via render_thread_capture()).
if self.frame_idx > 0 and not self.pipelined:
self.sink.capture_synchronous(self, self.frame_idx - 1)
if self.clock.should_stop(self.frame_idx):
self._drain_pipeline()
self.integration.on_loop_end(self)
engine._running = False
return
if not self.integration.pre_tick(self):
if self.frame_idx > 0:
self._drain_pipeline()
self.integration.on_loop_end(self)
engine._running = False
return
from simvx.core import Input, ScenePhases, step_scene_logic
from .draw2d import Draw2D
frame_dt = self.clock.advance(self.app)
# The physics->mid_tick->logic ORDER is the shared canonical sequence
# (simvx.core.scene_step). The clock keeps owning its dt/accumulator and
# physics-stepping policy, and mid_tick (gamepad + window-size sync) keeps
# its between-physics-and-logic slot; both are passed in as phase hooks so
# the phase order is byte-identical to the previous inline form.
step_scene_logic(
self.tree,
frame_dt,
phases=ScenePhases(
node_walk=self._clock_phase,
ui_events=lambda: self.integration.mid_tick(self),
),
)
Draw2D._reset()
self.tree.render(Draw2D)
self.integration.post_tick(self)
# Post-tick barrier: tree fully ticked + Draw2D built, GPU handoff not started.
# External requests (PEP 768 socket bridge, editor tooling) are drained here on
# the loop thread so all SceneTree mutation stays single-threaded. No cost when
# nothing is registered.
if self.services:
self.services.run(self)
if not self.pipelined:
engine.renderer.begin_frame()
self.adapter.submit_scene(self.tree)
# 2D item pipeline (design §2.6, P1.6): publish the item view and bind
# it for ``render`` to submit.
item_view, item_camera = self._publish_item_view()
engine.renderer.set_item_view(item_view, item_camera)
Input._end_frame()
Input._new_frame()
self.integration.per_frame_telemetry(self)
self.frame_idx += 1
def _drain_pipeline(self) -> None:
if self.pipelined and self._render_thread is not None and self.frame_idx > 0:
self._render_thread.wait_for_frame(self.frame_idx - 1)
def _capture_telemetry(self) -> None:
app = self.app
if self.pipelined and self._render_thread is not None:
with self.engine.renderer._frame_state_lock:
app._populate_telemetry(frames_rendered=self.frame_idx, occlusion_wait=True)
else:
app._populate_telemetry(frames_rendered=self.frame_idx, occlusion_wait=True)
# ---------------------------------------------------------------- gpu hooks
def _publish_item_view(self) -> tuple[Any, tuple]:
"""Collect + publish the 2D item view for this frame (design §2.6).
Returns ``(view, camera_affine)``. Lazily creates the per-scene
``RenderItemCache`` + ``ItemPublisher`` on the first frame. The cache
(design §2.7) skips a clean frame, applies an in-place item/transform
patch when only some drawables are dirty (P2 auto-invalidation via the
Drawable2D render bits), or re-collects on a structure / camera / theme
change. The publisher republishes the SAME frozen view (zero-copy) on a
clean frame, and re-freezes when the cache ``epoch`` advances (a patch
mutates the columns in place but keeps the result object's identity) --
so a patched frame reaches the render thread.
"""
from .render2d import (
ItemPublisher,
RenderItemCache,
ViewState,
camera_affine_from_tree,
)
if self._item_cache is None:
from simvx.core.ui.theme import theme_generation
self._item_cache = RenderItemCache(theme_generation=theme_generation)
self._item_publisher = ItemPublisher()
cam = getattr(self.tree, "_current_camera_2d", None)
if cam is not None:
z = float(cam.zoom) if cam.zoom > 0 else 1.0
view_state = ViewState(
offset=(float(cam.current[0]), float(cam.current[1])),
zoom=(z, z),
rotation=0.0,
viewport=tuple(int(s) for s in self.tree._screen_size),
)
else:
view_state = ViewState(viewport=tuple(int(s) for s in self.tree._screen_size))
struct_v = int(getattr(self.tree, "_structure_version", 0))
result = self._item_cache.frame(self.tree.root, structure_version=struct_v, view=view_state)
# Pass the cache epoch so an in-place patch (same result object, advanced
# epoch) re-freezes the published view; a clean frame (same epoch) reuses
# it verbatim (zero copy).
view = self._item_publisher.publish(result, epoch=self._item_cache.epoch)
return view, camera_affine_from_tree(self.tree)
def _frame_driver(self) -> None:
from .renderer.render_packet import extract_render_packet
engine = self.engine
if not self.pipelined or self._render_ring is None or self._render_thread is None:
# Pipelining off: synchronous GPU frame on the main thread (begin_frame +
# submit_scene already ran in _update), byte-identical to the default path.
engine._draw_frame()
return
# Pipelined device-loss recovery (R4): the render thread caught
# VK_ERROR_DEVICE_LOST, flagged the engine, and exited (it cannot rebuild
# the device it records on). Drive the rebuild HERE on the quiescent main
# thread; ``_recover_device`` -> ``_rebuild_device`` quiesces + respawns the
# render thread via the controller. Skip handing off this frame's packet
# (its lists targeted the dead renderer); the next frame re-submits.
if engine._device_lost:
if not engine._recover_device():
engine._running = False
return
renderer = engine.renderer
with renderer._frame_state_lock:
renderer.begin_frame()
self.adapter.submit_scene(self.tree)
self.app._warn_pipelined_unpacketised(self.tree)
# Publish the 2D item view inside the sync point (design §4): the
# publisher freezes the immutable view the render thread reads.
item_view, item_camera = self._publish_item_view()
packet = extract_render_packet(
renderer,
self.tree,
frame_index=self._packet_idx,
sub_viewports=self.app._sub_viewports,
item_view=item_view,
item_camera=item_camera,
)
try:
self._render_ring.submit(packet)
except RuntimeError:
engine._running = False
return
self._packet_idx += 1
if not self._render_thread.alive:
engine._running = False
def _pre_render_fn(self, cmd: Any) -> None:
self.integration.pre_render(self, cmd)
def _default_pre_render(self, cmd: Any) -> None:
"""Reserve the main slice, render SubViewports/probes (sync) or replay (pipelined)."""
app = self.app
renderer = self.engine.renderer
renderer.reserve_main_slice()
if not self.pipelined:
if app._sub_viewports is not None and self.adapter is not None:
app._sub_viewports.render_all(cmd, self.tree)
if self.adapter is not None:
renderer.capture_reflection_probes(cmd, self.adapter)
elif self.adapter is not None:
for sru in renderer._packet_subviewport_srus or []:
self.adapter.render_sru_from_plan(cmd, sru)
renderer.pre_render(cmd)
def _render_fn(self, cmd: Any, extent: Any) -> None:
self.engine.renderer.render(cmd)
def _cleanup(self) -> None:
if self.app._sub_viewports is not None:
self.app._sub_viewports.destroy()
def _rebuild_scene_adapter(self) -> None:
"""Re-point the SceneAdapter + SubViewportManager at the rebuilt renderer.
Installed as ``engine._on_device_rebuilt``; runs on the main thread inside
``Engine._rebuild_device`` after the new Renderer is created. SceneAdapter
holds a hard renderer reference (``scene_adapter.py:80``), so a device
rebuild that makes a fresh Renderer would otherwise leave the adapter
feeding the dead one (blank post-rebuild frames). Recreating it fresh
mirrors first-time ``_setup``.
"""
from .renderer.sub_viewport import SubViewportManager
from .scene_adapter import SceneAdapter
app = self.app
engine = self.engine
self.adapter = SceneAdapter(engine, engine.renderer)
app._scene_adapter = self.adapter # type: ignore[attr-defined]
engine._scene_adapter = self.adapter # type: ignore[attr-defined]
app._sub_viewports = SubViewportManager(engine, self.adapter) # type: ignore[attr-defined]
def _start_render_thread(self) -> None:
"""Create + start a fresh packet ring and render thread on the live device.
A Python thread is single-use, so the render thread is RECREATED (not
restarted) here -- used both at first setup and after a device rebuild. The
new thread re-establishes the command-pool + single-recorder invariant on
the rebuilt device.
"""
from .renderer.render_packet import RenderPacketRing
from .renderer.render_thread import RenderThread
engine = self.engine
self._render_ring = RenderPacketRing(capacity=self.sink.ring_capacity)
self._render_thread = RenderThread(
engine, engine.renderer, self._render_ring, capture=self.sink.render_thread_capture()
)
# Packet indices restart at 0 on the fresh ring/thread so headless capture
# frame-index sequencing stays consistent with the new thread's counter.
self._packet_idx = 0
self._render_thread.start()
def _stop_render_thread(self) -> None:
if self._render_thread is not None:
self._render_thread.stop()
self._render_thread = None
self._render_ring = None