Source code for simvx.graphics.renderer.render_thread

"""Dedicated render thread driving the pipelined GPU frame (opt-in, default OFF).

In pipelined mode (``App(render_thread=True)`` or ``WorldEnvironment
render_mode='pipelined'``) the MAIN thread simulates frame N+1 while this thread
records + submits the GPU work for frame N. The split:

- MAIN THREAD: ``glfwPollEvents`` (window events must stay on main), physics +
  tick, Draw2D, ``adapter.submit_scene`` (building the renderer's per-frame
  submission lists), then ``extract_render_packet`` into a CPU
  :class:`~.render_packet.RenderPacket`, then ``ring.submit(packet)``. It issues
  ZERO GPU calls.
- RENDER THREAD (this driver): ``ring.acquire`` a packet, ``install_packet`` it
  onto the renderer's per-frame attributes (under the renderer's
  ``_frame_state_lock``), then run the engine's existing GPU frame body
  (``wait_and_reset`` fence, ``vkAcquireNextImageKHR``, record pre_render + the
  render pass, ``vkQueueSubmit``, ``vkQueuePresentKHR``, ``sync.advance``), then
  ``ring.release``.

Invariants enforced here (see report):
  (a) The main thread issues no GPU calls: all ``vkCmd*`` / acquire / submit /
      present run on this thread.
  (b) This thread is the ONLY writer of the GPU SSBOs (``_upload_transforms`` /
      ``reserve_main_slice`` run here from the packet) and ``wait_and_reset``
      gates reuse, so a single GPU SSBO is safe (no per-frame GPU ring).
  (c) The pre_render / render closures read the renderer's per-frame attributes,
      which ``install_packet`` has just bound to the PACKET's owned snapshot; the
      ``_frame_state_lock`` makes the install + record region mutually exclusive
      with the main thread's begin_frame + submit_scene, so the main thread never
      tears those attributes mid-record.
  (d) +1 frame latency is bounded by the 2-slot ring's backpressure.
  (e) No deadlock on quit: ``stop`` closes the ring (waking a producer blocked on
      backpressure and this consumer blocked on acquire) and joins.

Command pool: this thread REUSES the engine's single command pool / per-frame
command buffers. Vulkan requires one pool per *recording* thread; that holds here
because in pipelined mode the render thread is the ONLY thread that records (the
main thread issues zero GPU work, invariant (a)). No second pool is created.
"""

from __future__ import annotations

import logging
import threading
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from collections.abc import Callable

    from ..engine import Engine
    from .forward import Renderer
    from .render_packet import RenderPacket, RenderPacketRing

log = logging.getLogger(__name__)

__all__ = ["RenderThread"]


[docs] class RenderThread: """Consumer thread: installs render packets and records + submits GPU frames. Args: engine: The :class:`Engine` owning the swapchain, queues, sync, and the per-frame command buffers. The render thread is the sole GPU caller. renderer: The forward :class:`Renderer` whose per-frame attributes a packet is installed onto before recording. ring: The :class:`RenderPacketRing` the main thread submits packets to. draw_frame: The engine's GPU-frame body to invoke per packet. Defaults to ``engine._draw_frame``. It records pre_render + the render pass (via the engine's pre_render / render callbacks), submits, and presents. """ def __init__( self, engine: Engine, renderer: Renderer, ring: RenderPacketRing, *, draw_frame: Callable[[], None] | None = None, capture: Callable[[int, Any], None] | None = None, ) -> None: self._engine = engine self._renderer = renderer self._ring = ring self._draw_frame = draw_frame or engine._draw_frame # Optional headless capture: called ON THIS THREAD right after a packet's # GPU frame is presented, with (frame_index, rgba_array). Keeping the # readback on the render thread preserves invariant (a) (the main thread # issues zero GPU calls) and avoids using the shared command pool from two # threads (the readback allocates a one-off cmd buffer from that pool). self._capture = capture self._thread = threading.Thread(target=self._run, name="simvx-render", daemon=True) # Set once the loop has exited; lets ``stop`` distinguish a clean join # from a thread that died with an exception (re-raised to the caller). self._error: BaseException | None = None # Latest frame_index whose GPU frame this thread has fully submitted + # presented (post ``sync.advance``). The headless capture path waits on # this so it reads a fully-recorded image, never a racy mid-frame one. self._frames_done = 0 self._done_cond = threading.Condition()
[docs] def start(self) -> None: """Spawn the render thread.""" self._thread.start()
def _run(self) -> None: try: while True: packet = self._ring.acquire() if packet is None: # Ring closed and drained: clean shutdown. return try: self._render_one(packet) finally: self._ring.release() except BaseException as exc: # noqa: BLE001 - re-raised on join via stop() self._error = exc log.exception("render thread crashed; stopping pipelined loop") # Wake any producer blocked on backpressure so the main thread does # not hang waiting for a slot this dead thread will never free. self._ring.close() # Wake any consumer blocked in ``wait_for_frame`` on a frame index this # dead thread will never reach: without this notify a headless capture # waiting with ``timeout=None`` would hang forever (lost wakeup). The # waiter re-checks ``self._error`` and returns/raises promptly. with self._done_cond: self._done_cond.notify_all() def _render_one(self, packet: RenderPacket) -> None: """Install one packet and record + submit its GPU frame on this thread.""" with self._renderer._frame_state_lock: self._renderer.install_packet(packet) self._draw_frame() # Capture (headless) on THIS thread, after present, while the frame's # image is still the last-presented one. The shared command pool is # only ever touched here, never from the main thread. if self._capture is not None: rgba = self._engine.capture_frame() self._capture(packet.frame_index, rgba) with self._done_cond: self._frames_done = packet.frame_index + 1 self._done_cond.notify_all()
[docs] def wait_for_frame(self, frame_index: int, timeout: float | None = None) -> bool: """Block until the GPU frame for ``frame_index`` has been submitted + presented. Used by the headless capture path to sequence ``capture_frame`` after the render thread has finished drawing the frame. Returns ``True`` once ``_frames_done`` has passed ``frame_index``, ``False`` on timeout. Raises the render thread's captured exception if it crashed before reaching ``frame_index``: the awaited frame will never arrive, so the caller must learn promptly rather than block forever or silently proceed on a stale frame. The crash handler's ``notify_all`` wakes a waiter that is blocked with ``timeout=None`` so this raise happens at once. """ with self._done_cond: while self._frames_done <= frame_index: if self._error is not None: raise self._error if not self._thread.is_alive(): return self._frames_done > frame_index if not self._done_cond.wait(timeout): return self._frames_done > frame_index return True
[docs] def stop(self, timeout: float | None = 5.0) -> None: """Close the ring and join the thread; re-raise any thread exception. ``close`` wakes a producer blocked on backpressure AND this consumer blocked on ``acquire`` (invariant (e)). The thread drains remaining packets, then exits. Re-raises any exception the thread captured so a render-thread crash surfaces on the main thread rather than vanishing. """ self._ring.close() if self._thread.is_alive(): self._thread.join(timeout) if self._thread.is_alive(): log.error("render thread did not exit within %.1fs", timeout or 0.0) if self._error is not None: err, self._error = self._error, None raise err
[docs] @property def alive(self) -> bool: return self._thread.is_alive()
[docs] @property def frames_done(self) -> int: with self._done_cond: return self._frames_done