Source code for simvx.graphics.renderer.sub_viewport

"""SubViewport render-to-texture: offscreen targets for ``core.SubViewport`` nodes.

A :class:`~simvx.core.SubViewport` renders its own subtree (with its own
camera) into an offscreen texture that other nodes in the *main* scene can
sample: a "live monitor in the world", a security-camera feed, a minimap, a
render-to-texture portal, etc.

Architecture mirrors :class:`~simvx.graphics.renderer.game_viewport.GameViewportRenderer`
(the proven editor game-preview path): each SubViewport owns a
:class:`RenderTarget`, registered once as a *bindless texture* whose slot id
stays stable across resizes so consumers that captured ``subviewport.texture``
keep working. :class:`SubViewportManager` keys one renderer per SubViewport by
``id(node)``, creates it on first sight, debounce-resizes on size changes, and
unregisters on exit-tree / teardown.

Frame ordering: all SubViewports are rendered **flat**, before the main scene
pass, in :meth:`SubViewportManager.render_all` (driven from the engine
``pre_render`` callback). Each target finishes in ``SHADER_READ_ONLY`` so the
main pass can sample it the same frame. A SubViewport that samples *another*
SubViewport sees last frame's content (one-frame lag): topological ordering is
out of scope for v1.
"""

from __future__ import annotations

import logging
from typing import Any

import vulkan as vk

from .draw2d_pass import Draw2DPass
from .render_target import RenderTarget

log = logging.getLogger(__name__)

__all__ = ["SubViewportRenderer", "SubViewportManager"]


[docs] class SubViewportRenderer: """One offscreen render target for a single SubViewport node. Wraps a :class:`RenderTarget` and exposes the ``ready`` / ``width`` / ``height`` / ``begin_pass`` / ``end_pass`` / ``render_draw2d`` surface that :meth:`SceneAdapter.render_to_target` expects: the same contract :class:`GameViewportRenderer` satisfies. """ def __init__(self, engine: Any) -> None: self._engine = engine self._target: RenderTarget | None = None self._draw2d_pass: Draw2DPass | None = None self._texture_id: int = -1 self._width: int = 0 self._height: int = 0 # Match the HDR offscreen pass format so the forward renderer's # pipelines (compiled against the HDR pass) are render-pass compatible. self._colour_format = vk.VK_FORMAT_R16G16B16A16_SFLOAT
[docs] def create(self, width: int, height: int) -> None: """Create the offscreen target and register its colour view as bindless.""" if width < 1 or height < 1: return self._width = width self._height = height self._target = RenderTarget( self._engine.ctx.device, self._engine.ctx.physical_device, width, height, colour_format=self._colour_format, use_depth=True, samplable_depth=True, queue=self._engine.ctx.graphics_queue, command_pool=self._engine.ctx.command_pool, ) self._texture_id = self._engine.register_texture(self._target.colour_view) text_pass = getattr(self._engine.renderer, "_text_pass", None) self._draw2d_pass = Draw2DPass(self._engine, text_pass=text_pass) self._draw2d_pass.setup(render_pass=self._target.render_pass, extent=(width, height)) log.debug("SubViewportRenderer created %dx%d, texture_id=%d", width, height, self._texture_id)
[docs] def resize(self, width: int, height: int) -> None: """Recreate the target at a new size, preserving the bindless slot. The slot id handed out as ``subviewport.texture`` stays stable; only the backing image view changes, so a Sprite2D / Material that captured the slot keeps sampling the live feed after a resize. """ if (width, height) == (self._width, self._height) or width < 1 or height < 1: return old_slot = self._texture_id vk.vkDeviceWaitIdle(self._engine.ctx.device) if self._draw2d_pass is not None: self._draw2d_pass.cleanup() self._draw2d_pass = None if self._target is not None: self._target.destroy() self._target = None self._width = width self._height = height self._target = RenderTarget( self._engine.ctx.device, self._engine.ctx.physical_device, width, height, colour_format=self._colour_format, use_depth=True, samplable_depth=True, queue=self._engine.ctx.graphics_queue, command_pool=self._engine.ctx.command_pool, ) if old_slot >= 0: self._engine.update_texture(old_slot, self._target.colour_view) else: self._texture_id = self._engine.register_texture(self._target.colour_view) text_pass = getattr(self._engine.renderer, "_text_pass", None) self._draw2d_pass = Draw2DPass(self._engine, text_pass=text_pass) self._draw2d_pass.setup(render_pass=self._target.render_pass, extent=(width, height))
[docs] def begin_pass(self, cmd: Any) -> None: """Begin the offscreen colour+depth pass (clear).""" rt = self._target if rt is None: return clear_values = [ vk.VkClearValue(color=vk.VkClearColorValue(float32=self._clear_colour)), vk.VkClearValue(depthStencil=vk.VkClearDepthStencilValue(depth=1.0, stencil=0)), ] rp_begin = vk.VkRenderPassBeginInfo( renderPass=rt.render_pass, framebuffer=rt.framebuffer, renderArea=vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height), ), clearValueCount=len(clear_values), pClearValues=clear_values, ) vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE) vk.vkCmdSetViewport(cmd, 0, 1, [vk.VkViewport( x=0.0, y=0.0, width=float(rt.width), height=float(rt.height), minDepth=0.0, maxDepth=1.0, )]) vk.vkCmdSetScissor(cmd, 0, 1, [vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height), )])
[docs] def end_pass(self, cmd: Any) -> None: if self._target is not None: vk.vkCmdEndRenderPass(cmd)
[docs] def render_draw2d(self, cmd: Any, ops: list) -> None: """Overlay pre-extracted Draw2D ops on top of the 3D content (LOAD_OP_LOAD).""" rt = self._target if rt is None or not ops or self._draw2d_pass is None: return rp_begin = vk.VkRenderPassBeginInfo( renderPass=rt.overlay_render_pass, framebuffer=rt.framebuffer, renderArea=vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height), ), clearValueCount=0, pClearValues=None, ) vk.vkCmdBeginRenderPass(cmd, rp_begin, vk.VK_SUBPASS_CONTENTS_INLINE) vk.vkCmdSetViewport(cmd, 0, 1, [vk.VkViewport( x=0.0, y=0.0, width=float(rt.width), height=float(rt.height), minDepth=0.0, maxDepth=1.0, )]) vk.vkCmdSetScissor(cmd, 0, 1, [vk.VkRect2D( offset=vk.VkOffset2D(x=0, y=0), extent=vk.VkExtent2D(width=rt.width, height=rt.height), )]) self._draw2d_pass.render(cmd, rt.width, rt.height, ops=ops) vk.vkCmdEndRenderPass(cmd)
[docs] @property def texture_id(self) -> int: return self._texture_id
[docs] @property def width(self) -> int: return self._width
[docs] @property def height(self) -> int: return self._height
[docs] @property def ready(self) -> bool: return self._target is not None and self._texture_id >= 0
# Per-frame clear colour, set by the manager from the SubViewport's # transparent_bg flag before begin_pass. _clear_colour: list[float] = [0.0, 0.0, 0.0, 1.0]
[docs] def destroy(self) -> None: """Release the target, Draw2D pass, and the bindless slot.""" if self._target is None and self._draw2d_pass is None: return vk.vkDeviceWaitIdle(self._engine.ctx.device) if self._draw2d_pass is not None: self._draw2d_pass.cleanup() self._draw2d_pass = None if self._target is not None: self._target.destroy() self._target = None if self._texture_id >= 0: self._engine.unregister_texture(self._texture_id) self._texture_id = -1 self._width = 0 self._height = 0
class _SubTreeView: """Minimal duck-typed SceneTree view over one SubViewport's subtree. :meth:`SceneAdapter.render_to_target` / :meth:`SceneAdapter.submit_scene` only read a small, fixed set of ``tree`` attributes. Rather than spin up a full :class:`SceneTree` (which fires lifecycle hooks, owns input tables, etc.) per SubViewport, this carries exactly those attributes. ``root`` is the SubViewport node itself, so ``_collect_nodes`` walks only its children. """ __slots__ = ( "root", "_screen_size", "_structure_version", "play_viewport_rect", "_render_camera_override", "_current_camera_2d", "overlay_offset", "_app", ) def __init__(self, root: Any, screen_size: tuple[float, float]) -> None: self.root = root self._screen_size = screen_size self._structure_version = -1 # forces a re-walk on first submit self.play_viewport_rect: tuple[float, float, float, float] | None = None self._render_camera_override = None self._current_camera_2d = None self.overlay_offset = (0.0, 0.0) self._app = None @property def screen_size(self) -> tuple[float, float]: return self._screen_size
[docs] class SubViewportManager: """Owns one :class:`SubViewportRenderer` per live SubViewport node. Created once per :class:`App` run and invoked from the engine ``pre_render`` callback via :meth:`render_all`. Discovers SubViewports each frame by walking the main scene tree, lazily creating a target on first sight, debounce-resizing when ``SubViewport.size`` changes, and reaping targets whose nodes have left the tree. """ # Resize debounce: only resize once the requested size has been stable for # this many consecutive frames (mirrors PlayMode.ensure_game_viewport_size). _RESIZE_DEBOUNCE = 6 def __init__(self, engine: Any, adapter: Any) -> None: self._engine = engine self._adapter = adapter # id(node) -> renderer / view / debounce bookkeeping self._renderers: dict[int, SubViewportRenderer] = {} self._views: dict[int, _SubTreeView] = {} self._nodes: dict[int, Any] = {} # keep a ref so id() stays unique while live self._pending_size: dict[int, tuple[int, int]] = {} self._stable_frames: dict[int, int] = {} # Monotonic stamp handed to each view's _structure_version before a # submit. The shared SceneAdapter caches its collected-node list by # (tree, _structure_version); without a unique stamp per view, two # views (both starting at -1) or a view and the main tree could collide # and one would render another tree's geometry. A fresh value each # render forces a correct re-walk per view. self._version_counter: int = 1_000_000
[docs] def render_all(self, tree: Any) -> bool: """Render every SubViewport in *tree* into its offscreen target. Called from the engine ``pre_render`` callback, before the main scene pass. Each SubViewport renders its own subtree with its own camera. Each SubViewport renders on its **own dedicated one-time command buffer**, submitted and *waited on* before the next viewport (and before the main frame's command buffer). This is mandatory: the renderer keeps a single per-frame transform / material SSBO that ``render_to_target`` overwrites. If two viewports (or a viewport and the main scene) shared one command buffer (executed together at submit), the last SSBO upload would win and earlier passes would render with the wrong transforms. Fully completing each offscreen pass first leaves its target in ``SHADER_READ_ONLY`` and frees the SSBO for the next pass to reuse. Returns ``True`` if at least one SubViewport rendered this frame: the caller must then re-submit the main scene, since ``render_to_target`` cleared the renderer's per-frame submission lists. A SubViewport that samples *another* SubViewport sees the previous frame's content (one-frame lag); v1 does not topologically order them. """ if tree is None or tree.root is None: return False # Lazy import keeps this module import-light and avoids a hard core dep # at module load (the type is only needed for isinstance discovery). from simvx.core import SubViewport from ..draw2d import Draw2D from ..gpu.memory import begin_single_time_commands, end_single_time_commands live = self._collect_subviewports(tree.root, SubViewport) self._reap(set(map(id, live))) if not live: return False ctx = self._engine.ctx rendered_any = False for node in live: key = id(node) w, h = int(node.size[0]), int(node.size[1]) if w < 1 or h < 1: continue rend = self._ensure_renderer(key, node, w, h) if rend is None or not rend.ready: continue self._debounce_resize(key, rend, node, w, h) # Honour update_mode: "always" (default) renders every frame; # "once" renders a single frame then freezes; "disabled" never # renders (slot stays valid but stale). mode = getattr(node, "render_target_update_mode", "always") if mode == "disabled": node._texture_id = rend.texture_id continue if mode == "once" and getattr(node, "_svp_rendered_once", False): node._texture_id = rend.texture_id continue view = self._views[key] self._version_counter += 1 view._structure_version = self._version_counter view._screen_size = (float(rend.width), float(rend.height)) view._current_camera_2d = self._find_camera_2d(node) camera_3d = self._find_camera_3d(node) rend._clear_colour = ( [0.0, 0.0, 0.0, 0.0] if getattr(node, "transparent_bg", False) else [0.0, 0.0, 0.0, 1.0] ) # Extract the subtree's own Draw2D overlay in isolation so it # doesn't leak into the main scene's Draw2D op list. sub_ops: list = [] with Draw2D._isolated(): node._draw_recursive(Draw2D) sub_ops = list(Draw2D._ops) # Each SubViewport renders + submits + waits on its OWN one-time # command buffer. This is required (not just for the main pass): # the renderer's single per-frame transform / material SSBO is # rewritten by every render_to_target, so two SubViewports sharing # one command buffer would have the second's SSBO upload corrupt the # first's already-recorded (but not-yet-executed) draws. cmd = begin_single_time_commands(ctx.device, ctx.command_pool) self._adapter.render_to_target( cmd, rend, view, camera=camera_3d, screen_size=(float(rend.width), float(rend.height)), draw2d_ops=sub_ops or None, ) end_single_time_commands(ctx.device, ctx.graphics_queue, ctx.command_pool, cmd) node._texture_id = rend.texture_id node._svp_rendered_once = True rendered_any = True return rendered_any
def _ensure_renderer(self, key: int, node: Any, w: int, h: int) -> SubViewportRenderer | None: rend = self._renderers.get(key) if rend is not None: return rend rend = SubViewportRenderer(self._engine) rend.create(w, h) if not rend.ready: rend.destroy() return None self._renderers[key] = rend self._views[key] = _SubTreeView(node, (float(w), float(h))) self._nodes[key] = node self._pending_size[key] = (w, h) self._stable_frames[key] = 0 node._texture_id = rend.texture_id return rend def _debounce_resize(self, key: int, rend: SubViewportRenderer, node: Any, w: int, h: int) -> None: target = (max(w, 1), max(h, 1)) if (rend.width, rend.height) == target: self._pending_size[key] = target self._stable_frames[key] = 0 return if target != self._pending_size.get(key): self._pending_size[key] = target self._stable_frames[key] = 0 else: self._stable_frames[key] += 1 if self._stable_frames[key] >= self._RESIZE_DEBOUNCE: rend.resize(*target) # Slot is stable across resize, but re-publish defensively. node._texture_id = rend.texture_id self._stable_frames[key] = 0 def _reap(self, live_keys: set[int]) -> None: """Destroy renderers whose SubViewport has left the tree.""" for key in [k for k in self._renderers if k not in live_keys]: self._renderers.pop(key).destroy() self._views.pop(key, None) self._nodes.pop(key, None) self._pending_size.pop(key, None) self._stable_frames.pop(key, None) @staticmethod def _collect_subviewports(root: Any, sub_viewport_type: type) -> list: """Find all SubViewports under *root*, but do NOT descend into them. A SubViewport's children belong to its own offscreen subtree, never the main pass, so we stop the walk at each SubViewport. """ found: list = [] stack = [root] while stack: node = stack.pop() if node is not root and isinstance(node, sub_viewport_type): found.append(node) continue # do not descend: children render offscreen stack.extend(node.children) # Root itself may be a SubViewport (unusual but valid). if root is not None and isinstance(root, sub_viewport_type) and root not in found: found.append(root) return found @staticmethod def _find_camera_3d(node: Any) -> Any: from simvx.core import Camera3D for child in node.children: cam = SubViewportManager._first_of_type(child, Camera3D) if cam is not None: return cam return None @staticmethod def _find_camera_2d(node: Any) -> Any: from simvx.core import Camera2D for child in node.children: cam = SubViewportManager._first_of_type(child, Camera2D) if cam is not None: return cam return None @staticmethod def _first_of_type(node: Any, typ: type) -> Any: stack = [node] while stack: n = stack.pop() if isinstance(n, typ) and getattr(n, "_visible_in_hierarchy", True): return n stack.extend(n.children) return None
[docs] def destroy(self) -> None: """Tear down all targets (call on app shutdown).""" for rend in self._renderers.values(): rend.destroy() self._renderers.clear() self._views.clear() self._nodes.clear() self._pending_size.clear() self._stable_frames.clear()