"""Dedicated render thread driving the pipelined GPU frame (opt-in, default OFF).
In pipelined mode (``App(render_thread=True)`` or ``WorldEnvironment
render_mode='pipelined'``) the MAIN thread simulates frame N+1 while this thread
records + submits the GPU work for frame N. The split:
- MAIN THREAD: ``glfwPollEvents`` (window events must stay on main), physics +
tick, Draw2D, ``adapter.submit_scene`` (building the renderer's per-frame
submission lists), then ``extract_render_packet`` into a CPU
:class:`~.render_packet.RenderPacket`, then ``ring.submit(packet)``. It issues
ZERO GPU calls.
- RENDER THREAD (this driver): ``ring.acquire`` a packet, ``install_packet`` it
onto the renderer's per-frame attributes (under the renderer's
``_frame_state_lock``), then run the engine's existing GPU frame body
(``wait_and_reset`` fence, ``vkAcquireNextImageKHR``, record pre_render + the
render pass, ``vkQueueSubmit``, ``vkQueuePresentKHR``, ``sync.advance``), then
``ring.release``.
Invariants enforced here (see report):
(a) The main thread issues no GPU calls: all ``vkCmd*`` / acquire / submit /
present run on this thread.
(b) This thread is the ONLY writer of the GPU SSBOs (``_upload_transforms`` /
``reserve_main_slice`` run here from the packet) and ``wait_and_reset``
gates reuse, so a single GPU SSBO is safe (no per-frame GPU ring).
(c) The pre_render / render closures read the renderer's per-frame attributes,
which ``install_packet`` has just bound to the PACKET's owned snapshot; the
``_frame_state_lock`` makes the install + record region mutually exclusive
with the main thread's begin_frame + submit_scene, so the main thread never
tears those attributes mid-record.
(d) +1 frame latency is bounded by the 2-slot ring's backpressure.
(e) No deadlock on quit: ``stop`` closes the ring (waking a producer blocked on
backpressure and this consumer blocked on acquire) and joins.
Command pool: this thread REUSES the engine's single command pool / per-frame
command buffers. Vulkan requires one pool per *recording* thread; that holds here
because in pipelined mode the render thread is the ONLY thread that records (the
main thread issues zero GPU work, invariant (a)). No second pool is created.
"""
from __future__ import annotations
import logging
import threading
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Callable
from ..engine import Engine
from .forward import Renderer
from .render_packet import RenderPacket, RenderPacketRing
log = logging.getLogger(__name__)
__all__ = ["RenderThread"]
[docs]
class RenderThread:
"""Consumer thread: installs render packets and records + submits GPU frames.
Args:
engine: The :class:`Engine` owning the swapchain, queues, sync, and the
per-frame command buffers. The render thread is the sole GPU caller.
renderer: The forward :class:`Renderer` whose per-frame attributes a
packet is installed onto before recording.
ring: The :class:`RenderPacketRing` the main thread submits packets to.
draw_frame: The engine's GPU-frame body to invoke per packet. Defaults to
``engine._draw_frame``. It records pre_render + the render pass (via
the engine's pre_render / render callbacks), submits, and presents.
"""
def __init__(
self,
engine: Engine,
renderer: Renderer,
ring: RenderPacketRing,
*,
draw_frame: Callable[[], None] | None = None,
capture: Callable[[int, Any], None] | None = None,
) -> None:
self._engine = engine
self._renderer = renderer
self._ring = ring
self._draw_frame = draw_frame or engine._draw_frame
# Optional headless capture: called ON THIS THREAD right after a packet's
# GPU frame is presented, with (frame_index, rgba_array). Keeping the
# readback on the render thread preserves invariant (a) (the main thread
# issues zero GPU calls) and avoids using the shared command pool from two
# threads (the readback allocates a one-off cmd buffer from that pool).
self._capture = capture
self._thread = threading.Thread(target=self._run, name="simvx-render", daemon=True)
# Set once the loop has exited; lets ``stop`` distinguish a clean join
# from a thread that died with an exception (re-raised to the caller).
self._error: BaseException | None = None
# Latest frame_index whose GPU frame this thread has fully submitted +
# presented (post ``sync.advance``). The headless capture path waits on
# this so it reads a fully-recorded image, never a racy mid-frame one.
self._frames_done = 0
self._done_cond = threading.Condition()
[docs]
def start(self) -> None:
"""Spawn the render thread."""
self._thread.start()
def _run(self) -> None:
try:
while True:
packet = self._ring.acquire()
if packet is None:
# Ring closed and drained: clean shutdown.
return
try:
self._render_one(packet)
finally:
self._ring.release()
except BaseException as exc: # noqa: BLE001 - re-raised on join via stop()
self._error = exc
log.exception("render thread crashed; stopping pipelined loop")
# Wake any producer blocked on backpressure so the main thread does
# not hang waiting for a slot this dead thread will never free.
self._ring.close()
# Wake any consumer blocked in ``wait_for_frame`` on a frame index this
# dead thread will never reach: without this notify a headless capture
# waiting with ``timeout=None`` would hang forever (lost wakeup). The
# waiter re-checks ``self._error`` and returns/raises promptly.
with self._done_cond:
self._done_cond.notify_all()
def _render_one(self, packet: RenderPacket) -> None:
"""Install one packet and record + submit its GPU frame on this thread."""
with self._renderer._frame_state_lock:
self._renderer.install_packet(packet)
self._draw_frame()
# Capture (headless) on THIS thread, after present, while the frame's
# image is still the last-presented one. The shared command pool is
# only ever touched here, never from the main thread.
if self._capture is not None:
rgba = self._engine.capture_frame()
self._capture(packet.frame_index, rgba)
with self._done_cond:
self._frames_done = packet.frame_index + 1
self._done_cond.notify_all()
[docs]
def wait_for_frame(self, frame_index: int, timeout: float | None = None) -> bool:
"""Block until the GPU frame for ``frame_index`` has been submitted + presented.
Used by the headless capture path to sequence ``capture_frame`` after the
render thread has finished drawing the frame. Returns ``True`` once
``_frames_done`` has passed ``frame_index``, ``False`` on timeout.
Raises the render thread's captured exception if it crashed before
reaching ``frame_index``: the awaited frame will never arrive, so the
caller must learn promptly rather than block forever or silently proceed
on a stale frame. The crash handler's ``notify_all`` wakes a waiter that
is blocked with ``timeout=None`` so this raise happens at once.
"""
with self._done_cond:
while self._frames_done <= frame_index:
if self._error is not None:
raise self._error
if not self._thread.is_alive():
return self._frames_done > frame_index
if not self._done_cond.wait(timeout):
return self._frames_done > frame_index
return True
[docs]
def stop(self, timeout: float | None = 5.0) -> None:
"""Close the ring and join the thread; re-raise any thread exception.
``close`` wakes a producer blocked on backpressure AND this consumer
blocked on ``acquire`` (invariant (e)). The thread drains remaining
packets, then exits. Re-raises any exception the thread captured so a
render-thread crash surfaces on the main thread rather than vanishing.
"""
self._ring.close()
if self._thread.is_alive():
self._thread.join(timeout)
if self._thread.is_alive():
log.error("render thread did not exit within %.1fs", timeout or 0.0)
if self._error is not None:
err, self._error = self._error, None
raise err
[docs]
@property
def alive(self) -> bool:
return self._thread.is_alive()
[docs]
@property
def frames_done(self) -> int:
with self._done_cond:
return self._frames_done